In [1]:
!pip install flask flask-ngrok

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime

# CARGAR LOS DATOS DEL ARCHIVO CSV
df_sales = pd.read_csv("sales_data.csv")

###############################################################################
#                             DATA CLEANING                                   #
###############################################################################
# REPLACE MISSING QUANTITIES WITH 0
df_sales['quantity'] = df_sales['quantity'].fillna(0)
df_sales

# REPLACE INVALID PRICE VALUES AND MISSING PRICES WITH MEDIAN PRICE PER CATEGORY
def clean_price(row):
    try:
        # CONVERT PRICE A TYPE FLOAT
        return float(row['price'])
    except ValueError:
        # CALCULE MEDIAN PRICE
        median_price = df_sales.loc[df_sales['category'] == row['category'], 'price']
        median_price = pd.to_numeric(median_price, errors='coerce').median()
        return median_price if not np.isnan(median_price) else 0

df_sales['price'] = df_sales.apply(clean_price, axis=1)
df_sales

# DROP ROWS WHERE BOTH QUANTITY AND PRICE ARE INVALID OR MISSING
df_sales = df_sales[(df_sales['quantity'] > 0) | (df_sales['price'] > 0)]
df_sales

Unnamed: 0,transaction_id,date,category,product,quantity,price
0,1,2024-07-01,Widget,Widget-A,10.0,9.99
1,2,2024-07-01,Gadget,Gadget-X,5.0,19.99
2,3,2024-07-02,Widget,Widget-B,7.0,9.99
3,4,2024-07-02,Doodad,Doodad-1,0.0,4.99
4,5,2024-07-03,Widget,Widget-C,3.0,9.99
5,6,2024-07-03,Gadget,Gadget-Y,8.0,19.99
6,7,2024-07-04,Widget,Widget-A,2.0,9.99
7,8,2024-07-04,Doodad,Doodad-2,4.0,4.99
8,9,2024-07-05,Widget,Widget-B,6.0,9.99
9,10,2024-07-05,Gadget,Gadget-X,3.0,19.99


In [3]:
###############################################################################
#                             DERIVED COLUMNS                                 #
###############################################################################

# CALCULATE TOTAL_SALES
df_sales['total_sales'] = df_sales['quantity'] * df_sales['price']

# ADD DAY_OF_WEEK COLUMN
df_sales['day_of_week'] = pd.to_datetime(df_sales['date']).dt.day_name()

# ADD HIGH_VOLUME FLAG
df_sales['high_volume'] = df_sales['quantity'] > 10
df_sales

Unnamed: 0,transaction_id,date,category,product,quantity,price,total_sales,day_of_week,high_volume
0,1,2024-07-01,Widget,Widget-A,10.0,9.99,99.9,Monday,False
1,2,2024-07-01,Gadget,Gadget-X,5.0,19.99,99.95,Monday,False
2,3,2024-07-02,Widget,Widget-B,7.0,9.99,69.93,Tuesday,False
3,4,2024-07-02,Doodad,Doodad-1,0.0,4.99,0.0,Tuesday,False
4,5,2024-07-03,Widget,Widget-C,3.0,9.99,29.97,Wednesday,False
5,6,2024-07-03,Gadget,Gadget-Y,8.0,19.99,159.92,Wednesday,False
6,7,2024-07-04,Widget,Widget-A,2.0,9.99,19.98,Thursday,False
7,8,2024-07-04,Doodad,Doodad-2,4.0,4.99,19.96,Thursday,False
8,9,2024-07-05,Widget,Widget-B,6.0,9.99,59.94,Friday,False
9,10,2024-07-05,Gadget,Gadget-X,3.0,19.99,59.97,Friday,False


In [4]:
###############################################################################
#                             COMPLEX TRANSFORMATIONS                         #
###############################################################################

# GROUP DF_SALES BY CATEGORY AND CALCULATE METRICS
category_metrics = df_sales.groupby('category').agg(
    avg_price=('price', 'mean'),
    total_revenue=('total_sales', 'sum')
).reset_index()

# ADD THE DAY WITH THE MOST SALES BY CATEGORY
day_with_highest_sales = (
    df_sales.groupby(['category', 'day_of_week'])['total_sales']
    .sum()
    .reset_index()
    .sort_values('total_sales', ascending=False)
    .groupby('category')
    .first()
    .reset_index()[['category', 'day_of_week']]
)
day_with_highest_sales.rename(columns={'day_of_week': 'day_with_highest_sales'}, inplace=True)

# RESULS
category_metrics = category_metrics.merge(day_with_highest_sales, on='category')
category_metrics

# IDENTIFY OUTLIERS (QUANTITY > 2 STANDARD DEVIATIONS FROM CATEGORY MEAN)
def flag_outliers(group):
    threshold = group['quantity'].mean() + 2 * group['quantity'].std()
    group['outlier'] = group['quantity'] > threshold
    return group

df_sales = df_sales.groupby('category').apply(lambda group: flag_outliers(group.reset_index(drop=True))).reset_index(drop=True)
df_sales

Unnamed: 0,transaction_id,date,category,product,quantity,price,total_sales,day_of_week,high_volume,outlier
0,4,2024-07-02,Doodad,Doodad-1,0.0,4.99,0.0,Tuesday,False,False
1,8,2024-07-04,Doodad,Doodad-2,4.0,4.99,19.96,Thursday,False,False
2,12,2024-07-06,Doodad,Doodad-3,1.0,4.99,4.99,Saturday,False,False
3,16,2024-07-08,Doodad,Doodad-1,2.0,4.99,9.98,Monday,False,False
4,19,2024-07-10,Doodad,Doodad-2,4.0,4.99,19.96,Wednesday,False,False
5,22,2024-07-11,Doodad,Doodad-3,3.0,4.99,14.97,Thursday,False,False
6,25,2024-07-13,Doodad,Doodad-1,2.0,4.99,9.98,Saturday,False,False
7,28,2024-07-14,Doodad,Doodad-2,4.0,4.99,19.96,Sunday,False,False
8,31,2024-07-16,Doodad,Doodad-3,1.0,4.99,4.99,Tuesday,False,False
9,34,2024-07-17,Doodad,Doodad-1,2.0,4.99,9.98,Wednesday,False,False


In [5]:
###############################################################################
#                                   STORAGE                                   #
###############################################################################

# CREATE SQLITE DATABASE
conn = sqlite3.connect('sales_dashboard.db')

# SAVE CLEANED DF_SALES
cleaned_data_table = df_sales[['transaction_id', 'date', 'category', 'product', 'quantity', 'price', 'total_sales', 'day_of_week', 'high_volume']]
cleaned_data_table.to_sql('Transactions', conn, if_exists='replace', index=False)

# SAVE AGGREGATED METRICS
category_metrics.to_sql('CategoryMetrics', conn, if_exists='replace', index=False)

# SAVE OUTLIERS
outliers_table = df_sales[df_sales['outlier']]
outliers_table.to_sql('Outliers', conn, if_exists='replace', index=False)

0

In [None]:
from flask import Flask, jsonify, request
import sqlite3

app = Flask(__name__)

# CONNECTION TO SQLITE DATABASE
def get_db_connection():
    try:
        conn = sqlite3.connect("sales_dashboard.db")
        conn.row_factory = sqlite3.Row
        return conn
    except sqlite3.Error as e:
        return jsonify({"error": f"Database connection failed: {str(e)}"}), 500

# API SALE PRODUCT
@app.route("/sales/product", methods=["GET"])
def get_sales_by_product():
    try:
        product = request.args.get("product")
        category = request.args.get("category")

        query = "SELECT product, SUM(total_sales) as total_sales FROM Transactions"
        filters = []
        if product:
            filters.append(f"product = '{product}'")
        if category:
            filters.append(f"category = '{category}'")
        if filters:
            query += " WHERE " + " AND ".join(filters)
        query += " GROUP BY product"

        conn = get_db_connection()
        rows = conn.execute(query).fetchall()
        conn.close()
        return jsonify([dict(row) for row in rows])
    except sqlite3.Error as e:
        return jsonify({"error": f"Database query failed: {str(e)}"}), 500
    except Exception as e:
        return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500

# API SALE DAY
@app.route("/sales/day", methods=["GET"])
def get_sales_by_day():
    try:
        date_start = request.args.get("start_date")
        date_end = request.args.get("end_date")

        query = "SELECT date, SUM(total_sales) as total_sales FROM Transactions"
        if date_start and date_end:
            query += f" WHERE date BETWEEN '{date_start}' AND '{date_end}'"
        query += " GROUP BY date"

        conn = get_db_connection()
        rows = conn.execute(query).fetchall()
        conn.close()
        return jsonify([dict(row) for row in rows])
    except sqlite3.Error as e:
        return jsonify({"error": f"Database query failed: {str(e)}"}), 500
    except Exception as e:
        return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500

# API SALE CATEGORY
@app.route("/sales/category", methods=["GET"])
def get_category_metrics():
    try:
        conn = get_db_connection()
        rows = conn.execute("SELECT * FROM CategoryMetrics").fetchall()
        conn.close()
        return jsonify([dict(row) for row in rows])
    except sqlite3.Error as e:
        return jsonify({"error": f"Database query failed: {str(e)}"}), 500
    except Exception as e:
        return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500

# API SALE OUTLIERS
@app.route("/sales/outliers", methods=["GET"])
def get_outliers():
    try:
        conn = get_db_connection()
        rows = conn.execute("SELECT * FROM Outliers").fetchall()
        conn.close()
        return jsonify([dict(row) for row in rows])
    except sqlite3.Error as e:
        return jsonify({"error": f"Database query failed: {str(e)}"}), 500
    except Exception as e:
        return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True, use_reloader=False)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.0.116:5000
Press CTRL+C to quit
127.0.0.1 - - [14/Jan/2025 12:08:37] "GET /sales/product HTTP/1.1" 200 -


In [None]:
import pytest
from app import app

@pytest.fixture
def client():
    with app.test_client() as client:
        yield client

# TEST API SALE PRODUCT        
def test_sales_by_product(client):
    response = client.get("/sales/product", query_string={"product": "Widget-A"})
    assert response.status_code == 200
    data = response.get_json()
    assert isinstance(data, list)
    if data:
        assert "product" in data[0]
        assert "total_sales" in data[0]

# TEST API SALE DAY
def test_sales_by_day(client):
    response = client.get("/sales/day", query_string={"start_date": "2024-07-01", "end_date": "2024-07-31"})
    assert response.status_code == 200
    data = response.get_json()
    assert isinstance(data, list)
    if data:
        assert "date" in data[0]
        assert "total_sales" in data[0]

# TEST API SALE CATEGORY
def test_category_metrics(client):
    response = client.get("/sales/category")
    assert response.status_code == 200
    data = response.get_json()
    assert isinstance(data, list)
    if data:
        assert "category" in data[0]
        assert "total_revenue" in data[0]

# TEST API OUTLIERS
def test_outliers(client):
    response = client.get("/sales/outliers")
    assert response.status_code == 200
    data = response.get_json()
    assert isinstance(data, list)
    if data:
        assert "outlier" in data[0]


In [None]:
#Expected output