In [20]:
## The only packages we would need for this pipeline are `Requests`, `os`, `Pandas`, `SQLite`, and `Streamlit`.
## Requests is to make HTTP requests to fetch the data from REST API endpoints
## And Pandas is for data transformations and wrangling.

import requests
import pandas as pd
import os
import json
import re

api_key = os.getenv('API_KEY')

sales = pd.read_csv("./data/sales_data.csv", parse_dates=["order_date"])

users = (pd.json_normalize(requests
            .get("https://jsonplaceholder.typicode.com/users")
            .json(), sep="_")[["id",
                               "name",
                               "username",
                               "email",
                               "address_geo_lat",
                               "address_geo_lng"]]
                                   .rename(
                                       columns=
                                       {
                                            "id": "customer_id",
                                            "address_geo_lat": "lat",
                                            "address_geo_lng": "lon"
                                    }
                                )
                            )

users[["lat", "lon"]] = (users[["lat", "lon"]]
                         .astype(float))

final = (sales
          .merge(users, on='customer_id'))

weather = []

for index, row in users.iterrows():
    res = (requests
            .get('https://api.openweathermap.org/data/2.5/weather?appid={key}&lon={lon}&lat={lat}&units=metric'
                .format(key = api_key,
                         lon = row['lon'],
                         lat=row['lat']))
                    .json())
    
    res["customer_id"] = row["customer_id"]

    res = (json.loads(
        re.sub(r'\[|\]', "", json.dumps(res))))
    
    weather.append(res)

weather = (pd.json_normalize(weather, sep="_"))
weather_transaction = (weather
                       .merge(sales[["customer_id", "order_id"]], on="customer_id")
                       .drop(columns="customer_id"))

final

Unnamed: 0,order_id,customer_id,product_id,quantity,price,order_date,name,username,email,lat,lon
0,2334,5,40,3,35.60,2022-06-21,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,-31.8129,62.5342
1,6588,5,26,1,15.87,2022-10-23,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,-31.8129,62.5342
2,3569,5,47,9,19.35,2023-05-25,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,-31.8129,62.5342
3,5200,5,50,6,82.12,2023-02-28,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,-31.8129,62.5342
4,1589,5,10,1,96.94,2023-01-05,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,-31.8129,62.5342
...,...,...,...,...,...,...,...,...,...,...,...
995,9985,6,2,10,24.47,2023-02-21,Mrs. Dennis Schulist,Leopoldo_Corkery,Karley_Dach@jasper.info,-71.4197,71.7478
996,7971,6,11,9,82.94,2022-11-01,Mrs. Dennis Schulist,Leopoldo_Corkery,Karley_Dach@jasper.info,-71.4197,71.7478
997,9182,6,38,6,18.15,2023-04-22,Mrs. Dennis Schulist,Leopoldo_Corkery,Karley_Dach@jasper.info,-71.4197,71.7478
998,7967,6,34,10,14.44,2022-07-26,Mrs. Dennis Schulist,Leopoldo_Corkery,Karley_Dach@jasper.info,-71.4197,71.7478


In [21]:
## Assuming the price in the price dataset is the unit price, we can calculated the sales value as a product of the product quantity and the price.
## I will use a lambda function to create a new column with the product of the two columns.

agg_data = (final
 .assign(sale_value = lambda x: (x['price'] * x['quantity'])))

## Total sales by customer

sales_by_cus = (agg_data[["name", "customer_id", "sale_value"]]
.groupby('name')
.sum('sale_value')
.reset_index()
.rename(columns={'sale_value': 'customer_spending'}))

## Average order quantity

avg_quant = (agg_data[["quantity", "product_id"]]
 .groupby('product_id', as_index=False)
 .mean('quantity'))

## Highest revenue generating products

top_selling = (agg_data[["name", "product_id", "sale_value"]]
 .groupby('product_id')
 .sum('sale_value')
 .sort_values('sale_value', ascending=False)
 .head(10)
 .reset_index())

## Series of sales volume by month and year

monthly_volume = (agg_data[["order_date", "sale_value"]]
 .groupby(agg_data["order_date"].dt.to_period('M'))
 .sum("sale_value")
 .reset_index())

monthly_volume["order_date"] = monthly_volume["order_date"].astype(str)

## average sale price by weather condition

avg_sale_weather = (agg_data[["customer_id", "sale_value"]]
 .merge(weather[["weather_main", "customer_id"]], on='customer_id')
 .groupby("weather_main", as_index=False)[["weather_main", "sale_value"]]
 .mean("sale_value"))

aggregates = [sales_by_cus, avg_quant, top_selling, monthly_volume, avg_sale_weather]
aggregate_tables = ['sales_by_cus', 'avg_quant', 'top_selling', 'monthly_volume', 'avg_sale_weather']

In [22]:
import sqlite3

con = sqlite3.connect("transasctions.db")
cur = con.cursor()

final.to_sql("sales_merged", con, if_exists='replace', index=False)

k = 0

for i in aggregates:
    i.to_sql(aggregate_tables[k], con, if_exists='replace', index=False)
    k += 1