<a href="https://colab.research.google.com/github/Sreelatha3/RetailSales_Datapipline_Azure_PowerBI/blob/main/SalesAnalysis_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

As a first step, we login to azure account through cli and fill out required information.

In [None]:
!az login

Then we create a resource SalesResourceGroup as

In [None]:
!az group create --name SalesResourceGroup --location centralindia

Under that resource group we create azure sql server with the following parameters:

1. server: neostatssalesserver
2. admin-user: salesadmin
3. admin-password: Ahtaleers@2507

In [None]:
!az sql server create --name neostatssalesserver --resource-group SalesResourceGroup --location centralindia --admin-user salesadmin --admin-password Ahtaleers@2507

With in that server we create salesdb with the following command

In [None]:
!az sql db create --resource-group SalesResourceGroup --server neostatssalesserver --name salesdb --service-objective S0

Then we open a firewall from our clients to connect to azure sql. In this case we allow everyone to connect to our sql server

In [None]:
!az sql server firewall-rule create --resource-group SalesResourceGroup --server neostatssalesserver --name AllowedIPs --start-ip-address 0.0.0.0 --end-ip-address 255.255.255.255

Install following python packages:
sqlalchemy
pyodbc

In [None]:
%pip install sqlalchemy pyodbc

Setup connection sqlalchemy connection to our azure sql server

In [None]:
from sqlalchemy import create_engine, text

# Connection properties
username = 'salesadmin'
password = 'Ahtaleers%402507'
server = 'neostatssalesserver'
database = 'salesdb'

# Connection string
connection_string = f"mssql+pyodbc://{username}:{password}@{server}.database.windows.net/{database}?driver=ODBC+Driver+17+for+SQL+Server"

# Create an SQLAlchemy engine
engine = create_engine(connection_string)


Declare Sales Table

In [None]:
from sqlalchemy import Column, Integer, String, Float, Date
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class SalesRaw(Base):
    __tablename__ = 'SALES_RAW'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    orderId = Column('orderid', String(10), nullable=False)
    customerName = Column('customername', String(100), nullable=False)
    phoneNumber = Column('phonenumber', String(20), nullable=True)
    location = Column('location', String(100), nullable=False)
    country = Column('country', String(50), nullable=False)
    storeCode = Column('storecode', String(20), nullable=False)
    product = Column('product', String(100), nullable=False)
    quantity = Column('quantity', Integer, nullable=False)
    price = Column('price', Float, nullable=False)
    purchaseDate = Column('purchasedate', Date, nullable=True)
    creditCardNumber = Column('creditcardnumber', String(16), nullable=True)
    expiryDate = Column('expirydate', Date, nullable=True)


Create Sales table

In [None]:
Base.metadata.create_all(engine)

Create sql alchemy session

In [None]:
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)
session = Session()

Read Sales data from disk and insert into SALES_RAW table while identifying the data anamolies

In [None]:
import csv
import os
from datetime import datetime

# Read csv file
with open(os.path.expanduser('~/Dev/SalesAnalysis/Sample_Data_Ingestion.csv'), newline='') as csvfile:
    reader = csv.DictReader(csvfile)

    # Skip the header
    header = next(reader)

    # Insert data into table
    rowNum = 0
    for row in reader:
        rowNum = rowNum + 1
        try:
            enr_num = row['PhoneNumber'].split(' ', 1)[1].replace(' ', '').replace('-', '')
            if (len(enr_num) != 10):
                raise Exception
        except:
            print(f'Warning: On row with number {rowNum}, the phone number {row["PhoneNumber"]} failed the validations')
            row['PhoneNumber'] = None

        try:
            enr_quantity = int(float(row['Quantity']))
            row['Quantity'] = enr_quantity
            if enr_quantity != float(row['Quantity']):
                print(f'Warning: On row with number {rowNum}, float value found for the quantity, casting it to int')
        except:
            print(f'Warning: On row with number {rowNum}, failed to cast the quantity {row["Quantity"]} to int. Skiping the row')
            continue

        try:
            row['Price'] = float(row['Price'])
        except:
            print(f'Warning: On row with number {rowNum}, the price {row["Price"]} is not in valid format. Skipping the row')
            continue

        try:
            enr_ccnum = row['CreditCardNumber'].replace(' ', '').replace('-', '')
            if len(enr_ccnum) != 16:
                raise Exception

            row['CreditCardNumber'] = enr_ccnum
        except:
            print(f'Warning: On row with number {rowNum}, the credit card number {row["CreditCardNumber"]} failed the validations')
            row['CreditCardNumber'] = None

        try:
            enr_pdate = datetime.strptime(row['Date'], '%d-%m-%Y').date()
            row['Date'] = enr_pdate
        except:
            print( f'Warning: On row with number {rowNum}, purchase date {row["Date"]} is not in correct format')
            row['Date'] = None
            enr_pdate = None

        try:
            enr_edate = datetime.strptime(row['ExpiryDate'], '%b-%y').date()
            if enr_pdate != None and enr_pdate > enr_edate:
                print(f'Warning: On row with number {rowNum}, purchase date is greater than expiry date skipping the row.')
                continue

            row['ExpiryDate'] = enr_edate
        except:
            print(f'Warning: On row with number {rowNum}, expiry date {row["ExpiryDate"]} is not in correct format')
            row['ExpiryDate'] = None

        saleRaw = SalesRaw(
            orderId=row['OrderID'],
            customerName=row['CustomerName'],
            phoneNumber=row['PhoneNumber'],
            location=row['Location'],
            country=row['Country'],
            storeCode=row['StoreCode'],
            product=row['Product'],
            quantity=row['Quantity'],
            price=row['Price'],
            purchaseDate=row['Date'],
            creditCardNumber=row['CreditCardNumber'],
            expiryDate=row['ExpiryDate']
        )

        session.add(saleRaw)

session.commit()

Declare enriched sales table

In [None]:
class SalesEnrich(Base):
    __tablename__ = 'SALES_ENRICH'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    orderId = Column('orderid', String(10), nullable=False)
    customerName = Column('customername', String(100), nullable=False)
    phoneCountryCode = Column('phonecountrycode', String(5), nullable=True)
    phoneNumber = Column('phonenumber', String(10), nullable=True)
    location = Column('location', String(100), nullable=False)
    country = Column('country', String(50), nullable=False)
    storeCode = Column('storecode', String(20), nullable=False)
    product = Column('product', String(100), nullable=False)
    quantity = Column('quantity', Integer, nullable=False)
    price = Column('price', Float, nullable=False)
    priceUSD = Column('priceusd', Float, nullable=False)
    purchaseDate = Column('purchasedate', Date, nullable=True)
    creditCardNumber = Column('creditcardnumber', String(16), nullable=True)
    expiryDate = Column('expirydate', String(10), nullable=True)


Create enriched sales tabl

In [None]:
Base.metadata.create_all(engine)

Enrich sales data:

1. To correct the phone number code based on the country.
2. To convert the prices into USD


In [None]:
phone_code_dict = {
    'USA': '+1',
    'CANADA': '+1',
    'INDIA': '+91',
    'UK': '+44',
    'AUSTRALIA': '+61'
}

conversion_rates = {
    'USA': 1.00,
    'UK': 1.27,
    'CANADA': 0.74,
    'AUSTRALIA': 0.64,
    'INDIA': 0.012,
    'JAPAN': 0.0070,
    'GERMANY': 1.09,
    'FRANCE': 1.09,
    'CHINA': 0.14
}

raw_sales = session.query(SalesRaw).all()
for raw_sale in raw_sales:
    enr_pcd = None
    ern_pnum = None
    if raw_sale.phoneNumber:
        [curr_pcd, phone_num] = raw_sale.phoneNumber.split(' ', 1)
        ern_pnum = phone_num.replace(' ', '').replace('-', '')
        enr_pcd = phone_code_dict.get(raw_sale.country.upper())
        if not enr_pcd:
            enr_pcd = curr_pcd

    sale = SalesEnrich(
        orderId=raw_sale.orderId,
        customerName=raw_sale.customerName,
        phoneCountryCode=enr_pcd,
        phoneNumber=ern_pnum,
        location=raw_sale.location,
        country=raw_sale.country,
        storeCode=raw_sale.storeCode,
        product=raw_sale.product,
        quantity=raw_sale.quantity,
        price=raw_sale.price,
        priceUSD=round(raw_sale.price * conversion_rates.get(raw_sale.country.upper(), 0), 2),
        purchaseDate=raw_sale.purchaseDate,
        creditCardNumber=raw_sale.creditCardNumber,
        expiryDate=raw_sale.expiryDate
    )

    session.add(sale)

session.commit()


Insight: Sales Distribution by Product Type and Country

In [None]:
with engine.connect() as connection:
    insight_sql = """
    SELECT country,
        product,
        ROUND(SUM(priceusd), 2) AS totalsales
    FROM SALES_ENRICH
    GROUP BY Country, Product
    ORDER BY Country, totalsales DESC;
    """
    result = connection.execute(text(insight_sql))
    print('Country | Product | TotalSales')
    for row in result:
        print('|'.join(map(str, row)))





Insight: Average Order Value by Product Type and Location

In [None]:
with engine.connect() as connection:
    insight_sql = """
    SELECT location,
        product,
        ROUND(AVG(priceusd), 2) AS averageprice
    FROM SALES_ENRICH
    GROUP BY location, product
    ORDER BY location, averageprice DESC;
    """
    result = connection.execute(text(insight_sql))
    print('Location|Product|Average Price')
    for row in result:
        print('|'.join(map(str, row)))


Insight: Product Popularity by Country

In [None]:
with engine.connect() as connection:
    insight_sql = """
    SELECT country,
        product,
        SUM(quantity) AS totalUnitsSold
    FROM SALES_ENRICH
    GROUP BY country, product
    ORDER BY country, totalUnitsSold DESC;
    """
    result = connection.execute(text(insight_sql))
    print('Country|Product|Total Units Sold')
    for row in result:
        print('|'.join(map(str, row)))

Insight: Sales Trends by Product Type Over Time

In [None]:
with engine.connect() as connection:
    insight_sql = """
    SELECT country,
        product,
        DATEPART(MONTH, purchasedate) AS purchasemonth,
        DATEPART(YEAR, purchasedate)  AS purchaseyear,
        ROUND(SUM(priceusd), 2)    AS TotalSalesUSD
    FROM SALES_ENRICH
    GROUP BY country, product, DATEPART(YEAR, purchasedate), DATEPART(MONTH, purchasedate)
    ORDER BY purchaseyear, purchasemonth, country, product;
    """
    result = connection.execute(text(insight_sql))
    print('Country|Product|Purchase Month|Purchase Year|Total Sales (USD)')
    for row in result:
        print('|'.join(map(str, row)))

Insight: Customer Preferences by Location

In [None]:
with engine.connect() as connection:
    insight_sql = """
    SELECT location,
        product,
        SUM(quantity) AS totalunits
    FROM SALES_ENRICH
    GROUP BY location, product
    ORDER BY location, totalunits DESC;
    """
    result = connection.execute(text(insight_sql))
    print('Location|Product|Total Units')
    for row in result:
        print('|'.join(map(str, row)))

Insight: Revenue Contribution by Product Type and Country

In [None]:
with engine.connect() as connection:
    insight_sql = """
    SELECT country,
        product,
        ROUND(SUM(priceusd), 2)                                                          AS revenuecontribution,
        ROUND(SUM(priceusd) * 100.0 / SUM(SUM(priceusd)) OVER (PARTITION BY country), 2) as percentagecontribution
    FROM SALES_ENRICH
    GROUP BY country, product
    ORDER BY country, PercentageContribution DESC;
    """
    result = connection.execute(text(insight_sql))
    print('Country|Product|Revenu Contribution|Percentage Contribution')
    for row in result:
        print('|'.join(map(str, row)))