Importation des bibliothéques nécessaires

In [56]:
import pandas as pd 
import pyodbc 
import os
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy import (
    create_engine, Column, Integer, String, Float, ForeignKey, Table, MetaData
)
from sqlalchemy.orm import declarative_base
import requests
from geopy.geocoders import Nominatim
import aiohttp
import asyncio


Extract function

In [57]:

def extract():
    try:
        #Fetching sales_2022 data from the database
        cnx = pyodbc.connect(
            'DRIVER={SQL Server};SERVER=server_name;DATABASE=database_name;Trusted_Connection=yes;'
        )
        sales_2022_data = pd.read_sql(
            'SELECT sale_id, product_id, customer_id, quantity, sale_date, price FROM sales2022', 
            cnx
        )
        print("Successfully fetched sales_2022 data.")
    except pyodbc.Error as e:
        print(f"Database error: {e}")
        sales_2022_data = None
    except Exception as ex:
        print(f"General error: {ex}")
        sales_2022_data = None
    finally:
        if 'cnx' in locals() and cnx is not None:
            cnx.close()

    try:
        #Fetching sales_2023 data from the csv file
        sales_2023_data = pd.read_csv('./data/raw/sales_2023.csv')
        print("Successfully loaded sales_2023 data.")
    except Exception as ex:
        print(f"Error loading sales_2023 data: {ex}")
        sales_2023_data = None

    try:
        #Fetching customer data from the csv file
        customer_data = pd.read_csv('./data/raw/customer_data.csv')
        print("Successfully loaded customer data.")
    except Exception as ex:
        print(f"Error loading customer data: {ex}")
        customer_data = None

    try:
        #Fetching product data from the json file
        product_data = pd.read_json('./data/raw/products_data.json')
        print("Successfully loaded product data.")
    except Exception as ex:
        print(f"Error loading product data: {ex}")
        product_data = None

    return sales_2022_data, sales_2023_data, customer_data, product_data


Transform function

In [58]:

async def get_country_from_city_async(city):
    async with aiohttp.ClientSession() as session:
        async with session.get(f'https://api.opencagedata.com/geocode/v1/json?q={city}&key=9f69324b1cdb4efebd80d2b31d24920b') as response:
            data = await response.json()
            if data['status']['code'] == 200:
                country = data['results'][0]['components'].get('country')
                return city, country
            return city, None

async def get_all_countries(cities):
    tasks = [get_country_from_city_async(city) for city in cities]
    return await asyncio.gather(*tasks)


async def transform(sales_2022, sales_2023, product_data, customer_data):
    
    #Convert sale_date to consistent format in sales_2022
    sales_2022['sale_date'] = pd.to_datetime(
        sales_2022['sale_date'], 
        errors='coerce').dt.date

    #Merge sales_2022 and sales_2023
    sales_data = pd.concat([sales_2022, sales_2023], axis=0)
    sales_data.drop(columns=['sale_id'], inplace=True)

    #Merge with product_data
    sales_data = sales_data.merge(
        product_data[['product_id', 'price', 'cost']],
        on='product_id',
        suffixes=('_sale', '_product')
    )

    #Calculate revenue and profit
    sales_data['revenue'] = sales_data['price_sale'] * sales_data['quantity']
    sales_data['profit'] = sales_data['revenue'] - (sales_data['cost'] * sales_data['quantity'])

    #Create time dimension from sales_data
    unique_dates = sales_data['sale_date'].dropna().unique()
    date_range = pd.to_datetime(unique_dates)
    time_dim = pd.DataFrame({'date': date_range})
    time_dim['date_id'] = range(1, len(time_dim) + 1)
    time_dim['year'] = time_dim['date'].dt.year
    time_dim['month'] = time_dim['date'].dt.month
    time_dim['day'] = time_dim['date'].dt.day

    #Merging time dimension with sales_data
    sales_data['sale_date'] = pd.to_datetime(sales_data['sale_date'], errors='coerce')
    sales_data = pd.merge(
        sales_data,
        time_dim[['date', 'date_id']],
        left_on='sale_date',
        right_on='date',
        how='left'
    )
    sales_data.drop(columns=['sale_date', 'date'], inplace=True)
    time_dim.drop(columns=['date'],inplace=True)


    # Enriching the customer_data with the country
    cities = customer_data['city'].unique()
    countries = await get_all_countries(cities)
    countries_dict = dict(countries)
    customer_data['country'] = customer_data['city'].apply(lambda city: countries_dict.get(city, None))

    #Create location dimension with unique city and country combinations
    location_dim = customer_data[['city', 'country']].drop_duplicates()

    # Assign a unique location_id for each unique city-country combination
    location_dim['location_id'] = range(1, len(location_dim) + 1)

    # Merge location_dim with customer_data to get customer_id associated with location_id
    customer_location_mapping = customer_data[['customer_id', 'city', 'country']]
    customer_location_mapping = pd.merge(customer_location_mapping, location_dim, on=['city', 'country'], how='left')

    # Merge sales_data with the customer_location_mapping to add location_id
    sales_data = pd.merge(
        sales_data, 
        customer_location_mapping[['customer_id', 'location_id']], 
        on='customer_id', 
        how='left'
    )

    # Finalize location_dim (city and country are already unique)
    location_dim = location_dim[['location_id', 'city', 'country']]

    # Create product dimension
    product_dim = product_data[['product_id', 'product_name', 'category']].drop_duplicates()

    # Create customer dimension
    customer_dim = customer_data[['customer_id', 'customer_name', 'age', 'gender']].drop_duplicates()
    
    # Keep only specific columns in the sales_data DataFrame
    sales_data = sales_data[['customer_id', 'product_id', 'location_id', 'date_id', 'quantity', 'revenue', 'cost']]
    sales_data['sale_id'] = range(1, len(sales_data) + 1)

    sales_data = sales_data[['sale_id', 'customer_id', 'product_id', 'location_id', 'date_id', 'quantity', 'revenue', 'cost']]
    customer_dim['gender'] = customer_dim['gender'].map({'Male': 0, 'Female': 1})
    return sales_data, product_dim, customer_dim, location_dim, time_dim


Load function

In [59]:
Base = declarative_base()

class ProductDim(Base):
    __tablename__ = 'product_dim'
    product_id = Column(Integer, primary_key=True)
    product_name = Column(String(255))
    category = Column(String(100))


class CustomerDim(Base):
    __tablename__ = 'customer_dim'
    customer_id = Column(Integer, primary_key=True)
    customer_name = Column(String(255))
    age = Column(Integer)
    gender = Column(String(10))


class LocationDim(Base):
    __tablename__ = 'location_dim'
    location_id = Column(Integer, primary_key=True)
    city = Column(String(100))
    country = Column(String(100))


class TimeDim(Base):
    __tablename__ = 'time_dim'
    date_id = Column(Integer, primary_key=True)
    year = Column(Integer)
    month = Column(Integer)
    day = Column(Integer)


class SalesFact(Base):
    __tablename__ = 'sales_fact'
    sale_id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey('customer_dim.customer_id'))
    product_id = Column(Integer, ForeignKey('product_dim.product_id'))
    location_id = Column(Integer, ForeignKey('location_dim.location_id'))
    date_id = Column(Integer, ForeignKey('time_dim.date_id'))
    quantity = Column(Integer)
    revenue = Column(Float)
    cost = Column(Float)


def load(sales_fact, product_dim, customer_dim, location_dim, time_dim):
    try:
        #SQLAlchemy connection string
        engine = create_engine(
            'mssql+pyodbc://server_name/test?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes'
        )

        #Verify connection
        with engine.connect() as conn:
            print("Connected to the database successfully.")

        #Create tables
        Base.metadata.create_all(engine)

        #Load data into tables
        tables = {
            SalesFact: sales_fact,
            ProductDim: product_dim,
            CustomerDim: customer_dim,
            LocationDim: location_dim,
            TimeDim: time_dim
        }

        for table_class, df in tables.items():
            df.to_sql(
                table_class.__tablename__,
                con=engine,
                schema='dbo',
                if_exists='replace',
                index=False,
                chunksize=1000
            )
            print(f"Data successfully loaded into {table_class.__tablename__}.")

    except Exception as e:
        print(f"An error occurred: {e}") 

In [60]:
sales_2022_data,sales_2023_data,customer_data,product_data=extract();
sales_fact, product_dim, customer_dim, location_dim, time_dim = await transform(sales_2022_data, sales_2023_data, product_data, customer_data)
#print(sales_fact)
load(sales_fact,product_dim,customer_dim,location_dim,time_dim)

  sales_2022_data = pd.read_sql(


Successfully fetched sales_2022 data.
Successfully loaded sales_2023 data.
Successfully loaded customer data.
Successfully loaded product data.
Connected to the database successfully.
Data successfully loaded into sales_fact.
Data successfully loaded into product_dim.
Data successfully loaded into customer_dim.
Data successfully loaded into location_dim.
Data successfully loaded into time_dim.
