In [55]:
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import random
from geopy.geocoders import Nominatim
from retrying import retry
import uuid

pd.set_option("display.max_columns", None)

In [56]:
sample_df = pd.read_csv('sample_superstore.csv')
sample_df.head()

Unnamed: 0,row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,country,city,state,postal_code,region,product_id,category,sub_category,product_name,sales,quantity,discount,profit
0,1,CA-2016-152156,11/8/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96,2,0.0,41.9136
1,2,CA-2016-152156,11/8/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs,...",731.94,3,0.0,219.582
2,3,CA-2016-138688,6/12/2016,6/16/2016,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters b...,14.62,2,0.0,6.8714
3,4,US-2015-108966,10/11/2015,10/18/2015,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,957.5775,5,0.45,-383.031
4,5,US-2015-108966,10/11/2015,10/18/2015,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,22.368,2,0.2,2.5164


In [57]:
def get_lat_long(city, country):
    geolocator = Nominatim(user_agent='hauct_geopy_key')
    location = geolocator.geocode(f'{city}, {country}', timeout=10)

    if location:
        return location.latitude, location.longitude

    return None

In [58]:
# sample_df['lat_long'] = sample_df.apply(lambda row: get_lat_long(row['city'], row['country']), axis=1)
# sample_df.head()

In [59]:
def extract_customer_info(sample_df):
    customer_info_df = sample_df.loc[:, ["customer_id", "customer_name", 'segment', 'country', 'city']]
    customer_info_df = customer_info_df.drop_duplicates()
    return customer_info_df

In [60]:
customer_info_df = extract_customer_info(sample_df)

In [61]:
customer_info_df.head()

Unnamed: 0,customer_id,customer_name,segment,country,city
0,CG-12520,Claire Gute,Consumer,United States,Henderson
2,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles
3,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale
5,BH-11710,Brosina Hoffman,Consumer,United States,Los Angeles
12,AA-10480,Andrew Allen,Consumer,United States,Concord


In [62]:
customer_info_df.to_dict('records')[4]

{'customer_id': 'AA-10480',
 'customer_name': 'Andrew Allen',
 'segment': 'Consumer',
 'country': 'United States',
 'city': 'Concord'}

In [63]:
def generate_customer_info(customer_info_df):
    n = random.randint(0, len(customer_info_df))
    return customer_info_df.to_dict('records')[n]

In [64]:
def calculate_price(row):
    return row['sales'] / row['quantity'] / (1 - row['discount'])

def extract_product_info(sample_df):
    product_info = sample_df.loc[:, ["category", 'sub_category', 'product_name', 'sales', 'quantity', 'discount']]
    product_info['price'] = product_info.apply(calculate_price, axis=1)
    product_info = product_info.loc[:, ["category", 'sub_category', 'product_name', 'price']]
    product_info = product_info.drop_duplicates()
    return product_info

In [65]:
product_info = extract_product_info(sample_df)
product_info.head()

Unnamed: 0,category,sub_category,product_name,price
0,Furniture,Bookcases,Bush Somerset Collection Bookcase,130.98
1,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs,...",243.98
2,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters b...,7.31
3,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,348.21
4,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,13.98


In [66]:
def generate_product_info(product_info):
    n = random.randint(0, len(product_info))
    return product_info.to_dict('records')[n]

In [67]:
generate_product_info(product_info)

{'category': 'Office Supplies',
 'sub_category': 'Binders',
 'product_name': 'Avery Durable Slant Ring Binders',
 'price': 7.92}

In [68]:
def generate_date_time():
    day = random.randint(1, 31)
    hour = random.randint(0, 23)
    minute = random.randint(0, 59)
    second = random.randint(0, 59)

    timestamp = datetime(2023, 12, day, hour, minute, second)

    return timestamp.strftime('%Y-%m-%d %H:%M:%S')

In [69]:
import uuid
str(uuid.uuid4())

'7056844e-337f-4825-9149-08e12c9766d4'

In [70]:
def generate_log(n_records):
    customer_info_df = extract_customer_info(sample_df)
    product_info_df = extract_product_info(sample_df)
    discount_list = [0.5, 0.4, 0.3, 0]
    
    for _ in range(n_records):
        customer_info_dict = generate_customer_info(customer_info_df)
        product_info_dict = generate_product_info(product_info_df)
        quantity = random.randint(1,20)
        discount = random.choices(discount_list, weights=(0.5,0.15,0.2,0.6))[0]
        ts = generate_date_time()
        
        data = {'ts_id': f'{str(uuid.uuid4())}',
                'ts':f'{ts}',
                'customer_id':customer_info_dict['customer_id'],
                'customer_name':customer_info_dict['customer_name'],
                'segment':customer_info_dict['segment'],
                'country':customer_info_dict['country'],
                'city':customer_info_dict['city'],
                'category':product_info_dict['category'],
                'sub_category':product_info_dict['sub_category'],
                'product_name':product_info_dict['product_name'],
                'price':product_info_dict['price'],
                'quantity':f'{quantity}',
                'discount':f'{discount}',
                'revenue': float(product_info_dict['price'])*quantity*(1-discount),
                'lat_long': get_lat_long(customer_info_dict['city'], customer_info_dict['country'])
                }
        print(f'Sending data: {data}')

In [71]:
data = generate_log(n_records = random.randint(1,10))
data['ts_id']

Sending data: {'ts_id': '81c88c25-4581-4894-96c3-5676d45f8a60', 'ts': '2023-12-30 18:47:33', 'customer_id': 'EM-14140', 'customer_name': 'Eugene Moren', 'segment': 'Home Office', 'country': 'United States', 'city': 'Santa Ana', 'category': 'Furniture', 'sub_category': 'Chairs', 'product_name': 'Office Star - Task Chair with Contemporary Loop Arms', 'price': 90.98, 'quantity': '20', 'discount': '0.3', 'revenue': 1273.72, 'lat_long': (33.7494951, -117.8732213)}
Sending data: {'ts_id': '4ec044ab-acf5-4a94-bea3-234d4448e384', 'ts': '2023-12-10 17:23:00', 'customer_id': 'HF-14995', 'customer_name': 'Herbert Flentye', 'segment': 'Consumer', 'country': 'United States', 'city': 'San Francisco', 'category': 'Technology', 'sub_category': 'Phones', 'product_name': 'AT&T CL2909', 'price': 125.98999999999998, 'quantity': '9', 'discount': '0', 'revenue': 1133.9099999999999, 'lat_long': (37.7790262, -122.419906)}
Sending data: {'ts_id': 'b23f4a57-3ec7-4e63-8063-60e9b3cbb2f9', 'ts': '2023-12-23 08:46:

TypeError: 'NoneType' object is not subscriptable

In [72]:
data

In [54]:
generate_log(n_records=4)

Sending data: {'ts_id': 'cf78cd61-343a-4dbf-a0b5-fb3be789608d', 'ts': '2023-12-27 06:57:47', 'customer_id': 'NK-18490', 'customer_name': 'Neil Knudson', 'segment': 'Home Office', 'country': 'United States', 'city': 'Corpus Christi', 'category': 'Office Supplies', 'sub_category': 'Paper', 'product_name': 'Geographics Note Cards, Blank, White, 8 1/2" x 11"', 'price': 11.19, 'quantity': '20', 'discount': '0', 'revenue': 223.79999999999998, 'lat_long': (27.7635302, -97.4033191)}
Sending data: {'ts_id': '5aa09c05-13c3-400a-9f02-bc56ee011d3a', 'ts': '2023-12-15 19:14:32', 'customer_id': 'JC-15775', 'customer_name': 'John Castell', 'segment': 'Consumer', 'country': 'United States', 'city': 'Newark', 'category': 'Technology', 'sub_category': 'Phones', 'product_name': 'Ooma Telo VoIP Home Phone System', 'price': 125.99000000000001, 'quantity': '13', 'discount': '0.4', 'revenue': 982.722, 'lat_long': (40.735657, -74.1723667)}
Sending data: {'ts_id': 'b4d228ee-54cf-4a7e-bee4-6636d8399059', 'ts': 

In [114]:
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ts

'2023-12-13 00:11:56'

In [111]:
customer_info_df.columns

Index(['customer_id', 'customer_name', 'segment', 'country', 'city'], dtype='object')

In [127]:
discount_list = ['0.5', '0.4', '0.3', '0']
random.choices(discount_list, weights=(5,15,20,60))[0]

'0.4'

In [None]:
        quantity = random.randint(1,20)
        discount = random.choices(discount_list, weights=(0.5,0.15,0.2,0.6))[0]


In [135]:
product_info_df = extract_product_info(sample_df)
product_info_dict = generate_product_info(product_info_df)
float(product_info_dict['price'])

130.98

In [136]:
quantity = random.randint(1,20)
quantity

1

In [73]:
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def get_lat_long(city, country):
    geolocator = Nominatim(user_agent='hauct_geopy_key')
    location = geolocator.geocode(f'{city}, {country}', timeout=10)

    if location:
        lat_long = f"({location.latitude}, {location.longitude})"
        return lat_long

    return None


In [74]:
city = 'Viet Nam'
country = 'Hue'
get_lat_long(city, country)

'(16.4639321, 107.5863388)'

In [75]:
type(get_lat_long(city, country))

str

In [76]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [77]:
spark = (SparkSession.builder
            .appName("StoreAnalysis")
            .master("local[*]")  # Use local Spark execution with all available cores
            .config("spark.jars.packages",
                    "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.0")  # Spark-Kafka integration
            .config("spark.jars",
                    "postgresql-42.7.1.jar")  # PostgreSQL driver
            .config("spark.sql.adaptive.enabled", "false")  # Disable adaptive query execution
            .getOrCreate())

In [80]:
df = spark.read.option("header", True).csv('sample_superstore.csv')
df.show(5,False)

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+-----------------------------------------------------------+--------+--------+--------+--------+
|row_id|order_id      |order_date|ship_date |ship_mode     |customer_id|customer_name  |segment  |country      |city           |state     |postal_code|region|product_id     |category       |sub_category|product_name                                               |sales   |quantity|discount|profit  |
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+-----------------------------------------------------------+--------+--------+--------+--------+
|1     |CA-2016-152156|11/8/2016 |11/11/2016|Second Class  |CG-12520   |Claire Gute    |Consumer |Un