In [1]:
import pandas as pd
import numpy as np
import os
import json
import requests
from pandas import json_normalize

In [2]:
import sqlite3
import pandas as pd

class Database:
    def __init__(self, db_name):
        self.db_name = db_name
        self.connection = None

    def connect(self):
        try:
            # Connect to the SQLite database. If it does not exist, it will be created.
            self.connection = sqlite3.connect(self.db_name)
            #print(f"Connected to SQLite database: {self.db_name}")
        except sqlite3.Error as e:
            print(f"Error: Unable to connect to SQLite database. {e}")

    def close_connection(self):
        if self.connection:
            self.connection.close()
            #print(f"Connection to SQLite database {self.db_name} closed.")
            self.connection = None

    def create_table(self, table_name, columns_dict):
        try:
            # Use a cursor to execute SQL commands
            cursor = self.connection.cursor()

            # Construct the CREATE TABLE query
            columns_str = ', '.join([f"{col_name} {col_desc}" for col_name, col_desc in columns_dict.items()])
            create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});"

            # Execute the query
            cursor.execute(create_table_query)
            print(f"Table {table_name} created (if not exists).")

            # Commit the changes and close the cursor
            self.connection.commit()
            cursor.close()

        except sqlite3.Error as e:
            print(f"Error: Unable to create table {table_name}. {e}")

    def insert_dataframe(self, table_name, dataframe, if_exists_m='append'):
        try:
            # Use the to_sql method to insert the DataFrame into the database
            dataframe.to_sql(table_name, self.connection, if_exists=if_exists_m, index=False)
            print(f"Data inserted into table {table_name}.")

            # Commit the changes
            self.connection.commit()

        except sqlite3.Error as e:
            print(f"Error: Unable to insert data into table {table_name}. {e}")

    def select_table(self, table_name):
        try:
            cursor = self.connection.cursor()
            cursor.execute(f"SELECT * FROM {table_name}")
            rows = cursor.fetchall()
            columns = [col[0] for col in cursor.description]
            result_df = pd.DataFrame(rows, columns=columns)
            cursor.close()
            return result_df

        except sqlite3.Error as e:
            print(f"Error: Unable to execute SELECT query on table {table_name}. {e}")
            return None
    
    def initialize_tables(self):
        self.connect()
        sales_table = "dwd_sales_data"
        columns_dict_sales_table = {"id": "INTEGER PRIMARY KEY", "order_id": "INTEGER", "customer_id": "INTEGER", "product_id": "INTEGER", "quantity": "INTEGER", "price": "DECIMAL", "order_date": "DATE"}
        self.create_table(sales_table, columns_dict_sales_table)

        users_data_table = "dim_users"
        columns_dict_users_data_table = {"id": "INTEGER", "name": "TEXT", "username": "TEXT", "email": "TEXT", "phone": "TEXT", "website": "TEXT", "street": "TEXT","suite": "TEXT", "city": "TEXT", "zipcode": "TEXT", "lat":"DECIMAL", "lng":"DECIMAL" ,"name": "TEXT","catchPhrase": "TEXT","bs": "TEXT"}
        self.create_table(users_data_table, columns_dict_users_data_table)

        weather_table = "weather_info_raw"
        columns_dict_weather_table = {"id": "INTEGER PRIMARY KEY", "lat": "DECIMAL", "lng": "DECIMAL", "feature": "TEXT"}
        self.create_table(weather_table, columns_dict_weather_table)

        self.close_connection()

In [3]:
class DataSources:
    def __init__(self):
        pass
    def get_weather_data(self, lat, lng, api_key='b251aa99ea5b80b71785ff34d7da8056'):
        try:
            url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lng}&appid={api_key}"
            print(url)
            response = requests.get(url)
            weather_data = response.json()
            return weather_data
        except:
            return None
    
    
    def read_json_user_data(self, link):
        response = requests.get(link)
        users_data = response.json()
        df = pd.DataFrame(users_data)
        df_expand = pd.concat(
            [df.drop(['address', 'company'], axis=1), json_normalize(df['address']), json_normalize(df['company'])],
            axis=1).reset_index(drop=True)
        return df_expand
    
    
    def read_sales_data(self, path_to_file):
        return pd.read_csv(path_to_file, sep=',')
    
    def read_weather_data(self, table_name, lat, lng):
        db_instance.connect()
        data_stored_in_db = db_instance.select_table(table_name)
        data_stored_in_db = data_stored_in_db[(data_stored_in_db['lat'] == lat) & (data_stored_in_db['lng'] == lng)]
    
        if len(data_stored_in_db) > 0:
            db_instance.close_connection()
            pass
            return None #json.loads((data_stored_in_db.loc[0]['feature']).replace("'", '"'))
        else:
            print('Lat and Lng are in not database')
            weather_data_from_api = self.get_weather_data(lat, lng)
            weather_data_to_insert = pd.DataFrame([{'lat': lat, 'lng': lng, 'feature': str(weather_data_from_api)}])
            db_instance.insert_dataframe(table_name, weather_data_to_insert)
            db_instance.close_connection()
            return weather_data_from_api
    


In [4]:
SLSH = os.path.sep
cwd = os.getcwd()
db_name = 'primary.db'
db_instance = Database(db_name)
get_data = DataSources()
db_instance.initialize_tables()

Table dwd_sales_data created (if not exists).
Table dim_users created (if not exists).
Table weather_info_raw created (if not exists).


In [5]:
path_to_sales_file = cwd + SLSH + 'sales_data.csv'
sales_data = get_data.read_sales_data(path_to_sales_file)
print(sales_data.shape)
db_instance.connect()
db_instance.insert_dataframe('dwd_sales_data', sales_data)
db_instance.close_connection()

(1000, 6)
Data inserted into table dwd_sales_data.


In [6]:
link_to_json = 'https://jsonplaceholder.typicode.com/users'
users_data = get_data.read_json_user_data(link_to_json)
users_data['geo.lng'] = pd.to_numeric(users_data['geo.lng'], errors='coerce')
users_data['geo.lat'] = pd.to_numeric(users_data['geo.lat'], errors='coerce')
users_data = users_data.rename(columns={'geo.lng': 'lng'})
users_data = users_data.rename(columns={'geo.lat': 'lat'})

print(users_data.shape)
db_instance.connect()
db_instance.insert_dataframe('dim_users', users_data)
db_instance.close_connection()

(10, 15)
Data inserted into table dim_users.


In [8]:
lat_lng_unique_set = users_data[['lat', 'lng']]
lat_lng_dict = lat_lng_unique_set.set_index('lat')['lng'].to_dict()
for latValue, lngValue in lat_lng_dict.items():
    weather_data = get_data.read_weather_data(table_name="weather_info_raw", lat=latValue, lng=lngValue)

db_instance.connect()
weather_data = db_instance.select_table('weather_info_raw')
db_instance.close_connection()

weather_details_df = pd.DataFrame(weather_data['feature'])
weather_details_df['feature'] = weather_details_df['feature'].apply(lambda x: json.loads(x.replace("'", '"')))
weather_details_df = json_normalize(weather_details_df['feature']).reset_index(drop=True)
weather_details_df = pd.concat([json_normalize(weather_details_df['weather'].explode()), weather_details_df], axis=1).reset_index(drop=True)
weather_details_df.columns = [x.replace('.','_') for x in list(weather_details_df.columns)]
weather_details_df = weather_details_df.astype(str)
dict_to_create_table = dict()
for column in weather_details_df.columns:
    dict_to_create_table[column] = "TEXT" 
db_instance.connect()
db_instance.create_table('dim_weather_details', dict_to_create_table)
db_instance.insert_dataframe('dim_weather_details', weather_details_df)
db_instance.close_connection()

Table dim_weather_details created (if not exists).
Data inserted into table dim_weather_details.


In [14]:
weather_details_df.columns

Index(['id', 'main', 'description', 'icon', 'weather', 'base', 'visibility',
       'dt', 'timezone', 'id', 'name', 'cod', 'coord_lon', 'coord_lat',
       'main_temp', 'main_feels_like', 'main_temp_min', 'main_temp_max',
       'main_pressure', 'main_humidity', 'main_sea_level', 'main_grnd_level',
       'wind_speed', 'wind_deg', 'wind_gust', 'clouds_all', 'sys_sunrise',
       'sys_sunset', 'rain_1h'],
      dtype='object')

In [9]:
sales_users_data = sales_data.merge(users_data, how='left', left_on='customer_id', right_on='id')

In [17]:
weather_details_df['coord_lon'] = pd.to_numeric(weather_details_df['coord_lon'], errors='coerce')
weather_details_df['coord_lat'] = pd.to_numeric(weather_details_df['coord_lat'], errors='coerce')
sales_user_weather_df = pd.merge(sales_users_data, weather_details_df, how='left', left_on=['lng', 'lat'], right_on=['coord_lon', 'coord_lat'])

In [22]:
query_df = sales_user_weather_df.copy()
query_df['qty_x_price'] = query_df['quantity'] * query_df['price']

In [106]:
class QueryMgmt:
    def __init__(self, df):
        self.query_df = df 

    def transformation(self):
        self.query_df['qty_x_price'] = self.query_df['quantity'] * self.query_df['price']
        self.query_df[['year', 'month', 'day']] = query_df['order_date'].str.split('-',expand=True)
        self.query_df['year_month'] = self.query_df['year'] + '-' + self.query_df['month']
        self.query_df['year'] = self.query_df['year'].astype(int)
        self.query_df['month'] = self.query_df['month'].astype(int)
        self.query_df['day'] = self.query_df['day'].astype(int)
        self.query_df['year_quarter'] = (self.query_df['year'].astype(str)).str[-2:] + '-Q' + self.query_df['quarter'].astype(str)

    def total_sales_per_customer(self):
        return self.query_df.groupby('customer_id')["qty_x_price"].sum().reset_index()
        
    def avg_order_qty_per_product(self):
        return self.query_df.groupby('product_id')['quantity'].mean().reset_index()
    
    def top_selling_products(self):
        return self.query_df.groupby('product_id')['qty_x_price'].sum().reset_index().sort_values(by='qty_x_price', ascending=False)

    def top_customers(self):
        return self.query_df.groupby('username')['qty_x_price'].sum().reset_index().sort_values(by='qty_x_price', ascending=False)
    
    def sales_trend_over_time(self, time_period):
        if time_period == 'Month':
            return self.query_df.groupby('year_month').agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'), Sum_of_Sales=('qty_x_price', 'sum'), Avg_Sales=('qty_x_price', 'mean'), Number_of_Custumers=('customer_id', 'count'), Number_of_Unique_Custumers=('customer_id', pd.Series.nunique)).sort_values(by='year_month', ascending=True)
        elif time_period == 'Quarter':
            return self.query_df.groupby('year_quarter').agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'), Sum_of_Sales=('qty_x_price', 'sum'), Avg_Sales=('qty_x_price', 'mean'), Number_of_Custumers=('customer_id', 'count'), Number_of_Unique_Custumers=('customer_id', pd.Series.nunique)).sort_values(by='year_quarter', ascending=False)
        elif time_period == 'Year':
            return self.query_df.groupby('year').agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'), Sum_of_Sales=('qty_x_price', 'sum'), Avg_Sales=('qty_x_price', 'mean'), Number_of_Custumers=('customer_id', 'count'), Number_of_Unique_Custumers=('customer_id', pd.Series.nunique)).sort_values(by='year', ascending=False)
        else:
            return None

    def sales_trend_over_weather(self, time_perid):
        return query_df.groupby(['product_id', 'description']).agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'), Sum_of_Sales=('qty_x_price', 'sum'), Avg_Sales=('qty_x_price', 'mean'), Number_of_Custumers=('customer_id', 'count'), Number_of_Unique_Custumers=('customer_id', pd.Series.nunique)).sort_values(by='description', ascending=False)


In [107]:
#Calculate total sales amount per customer.
#query_df.groupby('customer_id')["qty_x_price"].sum().reset_index()

In [108]:
#Determine the average order quantity per product.
#query_df.groupby('product_id')['quantity'].mean().reset_index()
#query_df.groupby('product_id').agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'))
#sales_data.columns

In [101]:
#Identify the top-selling products or customers
#Products
#query_df.groupby('product_id')['qty_x_price'].sum().reset_index().sort_values(by='qty_x_price', ascending=False).head(5)

In [102]:
#Customers
#query_df.groupby('username')['qty_x_price'].sum().reset_index().sort_values(by='qty_x_price', ascending=False).head(5)

In [103]:
#Analyze sales trends over time (e.g., monthly or quarterly sales)
#query_df[['year', 'month', 'day']] = query_df['order_date'].str.split('-',expand=True)
#query_df['year_month'] = query_df['year'] + '-' + query_df['month']
#query_df.groupby('year_month').agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'), Sum_of_Sales=('qty_x_price', 'sum'), Avg_Sales=('qty_x_price', 'mean'), Number_of_Custumers=('customer_id', 'count'), Number_of_Unique_Custumers=('customer_id', pd.Series.nunique)).sort_values(by='year_month', ascending=True)

In [109]:
# query_df['year'] = query_df['year'].astype(int)
# query_df['month'] = query_df['month'].astype(int)
# query_df['day'] = query_df['day'].astype(int)
# query_df['year_quarter'] = (query_df['year'].astype(str)).str[-2:] + '-Q' + query_df['quarter'].astype(str)
#query_df.groupby('year').agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'), Sum_of_Sales=('qty_x_price', 'sum'), Avg_Sales=('qty_x_price', 'mean'), Number_of_Custumers=('customer_id', 'count'), Number_of_Unique_Custumers=('customer_id', pd.Series.nunique)).sort_values(by='year', ascending=False)

In [110]:
#query_df.groupby(['product_id', 'description']).agg(Count_of_Orders=('order_id', 'count'), Count_of_items=('product_id', 'count'), Sum_of_Sales=('qty_x_price', 'sum'), Avg_Sales=('qty_x_price', 'mean'), Number_of_Custumers=('customer_id', 'count'), Number_of_Unique_Custumers=('customer_id', pd.Series.nunique)).sort_values(by='description', ascending=False)

In [111]:
#query_df.head()