## Kafka Data Ingestion Procedures

In [116]:
import pandas as pd
import numpy as np 

from faker import Faker
from pprint import pprint
from faker.providers import BaseProvider


In [121]:

# Custom provider for material names
class MaterialProvider(BaseProvider):
    def material(self):
        materials = [
            'Wood', 'Steel', 'Plastic', 'Glass', 'Aluminum', 'Copper', 'Brass', 'Concrete', 'Rubber', 'Ceramic',
            'Titanium', 'Carbon Fiber', 'Fiberglass', 'Leather', 'Silicone', 'Polycarbonate', 'Acrylic', 'Nylon',
            'Polyester', 'PVC', 'Teflon', 'Kevlar', 'Graphene', 'Bamboo', 'Marble', 'Granite', 'Quartz', 'Slate'
        ]
        return self.random_element(materials)

# Create a Faker instance and add the custom provider
fake = Faker()
fake.add_provider(MaterialProvider)

## Create kafka admin

In [318]:
from confluent_kafka.admin import AdminClient, NewTopic

# Configuration for connecting to Kafka
admin_conf = {
    'bootstrap.servers': 'localhost:29092',  # Assuming Kafka is exposed on localhost:9092
}

# Create an AdminClient instance
admin_client = AdminClient(admin_conf)


In [294]:
admin_client.list_topics().topics.keys()

dict_keys(['__transaction_state', 'inventory_status', '_confluent-ksql-default__command_topic', 'products', '__consumer_offsets', 'suppliers', 'shipping_status', 'manufacturers', 'supplied_material_status', '_confluent-ksql-default_transient_transient_CUSTOMERS_4470057005752031729_1729351420396-KsqlTopic-Reduce-changelog', 'distributors', 'order_info', '_schemas', 'retailers', 'customers', 'supplier_manufacturer_contract'])

In [317]:
admin_client.delete_topics(['order_info'])

{'order_info': <Future at 0x7fbabee2b190 state=running>}

%3|1729801939.071|FAIL|rdkafka#producer-25| [thrd:localhost:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 9ms in state CONNECT, 1 identical error(s) suppressed)
%3|1729801969.827|FAIL|rdkafka#producer-25| [thrd:localhost:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 109ms in state CONNECT, 30 identical error(s) suppressed)
%3|1729802019.917|FAIL|rdkafka#producer-24| [thrd:localhost:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 16ms in state CONNECT)
%5|1729802020.919|PARTCNT|rdkafka#producer-24| [thrd:main]: Topic order_info partition count changed from 2 to 1


### Create the topics

In [319]:

# Define the new topics to create (you can add more topics here)
topics = [
    NewTopic("suppliers", num_partitions=2, replication_factor=1),
    NewTopic("manufacturers", num_partitions=2, replication_factor=1),
    NewTopic("products", num_partitions=2, replication_factor=1),
    NewTopic("distributors", num_partitions=2, replication_factor=1),
    NewTopic("retailers", num_partitions=2, replication_factor=1),
    NewTopic("customers", num_partitions=2, replication_factor=1),
    NewTopic("orders_status", num_partitions=2, replication_factor=1),
    NewTopic("inventory_status", num_partitions=2, replication_factor=1),
    NewTopic("shipping_status", num_partitions=2, replication_factor=1)
]
topics = [
    # NewTopic("supplier_manufacturer_contract", num_partitions=2, replication_factor=1),
    NewTopic("order_info", num_partitions=2, replication_factor=1)
]
# Create the topics
try:
    topic_creation_futures = admin_client.create_topics(topics)
    
    # Wait for the results of the topic creation (optional)
    for topic, future in topic_creation_futures.items():
        try:
            future.result()  # The result itself is None if the topic is created successfully
            print(f"Topic {topic} created successfully.")
        except Exception as e:
            print(f"{e}")
except Exception as e:
    print(f"Error during topic creation: {e}")


KafkaError{code=_TRANSPORT,val=-195,str="Failed while waiting for response from broker: Local: Broker transport failure"}


%3|1729802026.990|FAIL|rdkafka#producer-26| [thrd:localhost:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 16ms in state CONNECT)


## Create Suppliers DB

In [25]:
suppliers_data = []
supplier = {}

for i in range(100):
    supplier = {}
    supplier['SupplierID'] = f'S{i}'
    supplier['SupplierName'] = faker.company() + ' ' + faker.company_suffix()
    supplier['City'] = faker.city()
    supplier['State'] = faker.state()
    supplier['ZipCode'] = faker.zipcode()
    supplier['Country'] = faker.country()
    supplier['Rating'] = np.random.randint(0,6)
    suppliers_data.append(supplier)
    

In [26]:
df_supplier = pd.DataFrame.from_dict(suppliers_data)
df_supplier

Unnamed: 0,SupplierID,SupplierName,City,State,ZipCode,Country,Rating
0,S0,Martin-Black and Sons,Tatefurt,Indiana,99583,Saint Lucia,3
1,S1,Newton Inc Group,East Daniel,Washington,94625,El Salvador,0
2,S2,Allen Group and Sons,Kevinshire,Maryland,44450,Egypt,3
3,S3,Washington Inc Ltd,West Amber,California,84010,United States Minor Outlying Islands,1
4,S4,"Bryant, Smith and Randolph Group",North Barbaraview,Washington,40206,Netherlands Antilles,0
...,...,...,...,...,...,...,...
95,S95,"Weber, Herrera and Ross LLC",Ryanborough,Georgia,71305,Uruguay,4
96,S96,"Perez, Mclean and Mcpherson LLC",New Elizabeth,Mississippi,33145,Martinique,0
97,S97,Jacobson Ltd Group,Mariaport,North Carolina,40405,Korea,3
98,S98,Salas LLC Ltd,New Josephmouth,New Jersey,99939,Algeria,2


In [27]:
from confluent_kafka import Producer
import json

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for supplier in suppliers_data:
    producer.produce('suppliers', key=supplier['SupplierID'], value=json.dumps(supplier))

producer.flush()


0

## Create Manufacturers DB

In [7]:
manufacturers_data = []
supplier = {}

for i in range(30):
    supplier = {}
    supplier['ManufacturerID'] = f'M{i}'
    supplier['ManufacturerName'] = faker.company() + ' ' + faker.company_suffix()
    supplier['City'] = faker.city()
    supplier['State'] = faker.state()
    supplier['ZipCode'] = faker.zipcode()
    supplier['Country'] = faker.country()
    supplier['ProductionCapacity'] = np.random.randint(10,100)*100
    manufacturers_data.append(supplier)
    

In [8]:
df_manufacturers = pd.DataFrame.from_dict(manufacturers_data)
df_manufacturers

Unnamed: 0,ManufacturerID,ManufacturerName,City,State,ZipCode,Country,ProductionCapacity
0,M0,Young Inc and Sons,Jennifermouth,Utah,37544,Svalbard & Jan Mayen Islands,9100
1,M1,"Richardson, Price and Holmes and Sons",Woodsstad,South Dakota,93766,Uruguay,7900
2,M2,Howard-Howell LLC,South John,Illinois,62512,Jamaica,4600
3,M3,"Nguyen, Myers and Reed Group",Lauriemouth,Georgia,79315,Brazil,9700
4,M4,Davis-Brock PLC,Markfort,Vermont,96459,Kyrgyz Republic,4800
5,M5,"Ruiz, Klein and Harrell LLC",Rebeccashire,Delaware,17256,Mexico,7800
6,M6,Pruitt LLC LLC,Gloriaton,Missouri,65580,Bangladesh,1000
7,M7,Fox-Pham PLC,West Michael,Idaho,78004,Romania,6700
8,M8,Maldonado-Sullivan and Sons,West Jessicaside,Arkansas,76616,Libyan Arab Jamahiriya,6800
9,M9,"Rose, White and Cox LLC",Calderonland,Arkansas,88758,Togo,3600


In [9]:
from confluent_kafka import Producer
import json

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for manufacturer in manufacturers_data:
    producer.produce('manufacturers', key=manufacturer['ManufacturerID'], value=json.dumps(manufacturer))

producer.flush()


0

%3|1729788915.254|FAIL|rdkafka#producer-2| [thrd:localhost:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 57ms in state CONNECT)


### Create Supplier Manufacturer Contract

In [18]:
df_manufacturers.shape[0], df_supplier.shape[0]

(30, 100)

In [83]:
## Randomly assign each supplier to at least one manufacturer

# 1: Assume df_suppliers and df_manufacturers are already available
df_suppliers = pd.DataFrame({
    'Supplier_ID': [f'S{i}' for i in range(100)]  # 100 suppliers from S0 to S99
})

df_manufacturers = pd.DataFrame({
    'Manufacturer_ID': [f'M{i}' for i in range(30)]  # 30 manufacturers from M0 to M29
})

# 2: Ensure every supplier has at least one manufacturer and vice versa
# Randomly assign each supplier to at least one manufacturer
mandatory_pairings = pd.DataFrame({
    'Supplier_ID': np.repeat(df_suppliers['Supplier_ID'].values, repeats=3),  # One contract per supplier
    'Manufacturer_ID': np.random.choice(df_manufacturers['Manufacturer_ID'], size=300, replace=True)
})

# Randomly assign each manufacturer to at least one supplier
manufacturer_pairings = pd.DataFrame({
    'Supplier_ID': np.random.choice(df_suppliers['Supplier_ID'], size=30, replace=True),  # One contract per manufacturer
    'Manufacturer_ID': df_manufacturers['Manufacturer_ID']  # Each manufacturer gets one contract
})

# 3: Concatenate the two lists to ensure full coverage
# Now all suppliers and manufacturers have at least one contract
initial_pairings = pd.concat([mandatory_pairings, manufacturer_pairings]).drop_duplicates()

# 4: Add additional random many-to-many pairings
additional_pairings = pd.DataFrame({
    'Supplier_ID': np.random.choice(df_suppliers['Supplier_ID'], size=100, replace=True),  # Additional contracts
    'Manufacturer_ID': np.random.choice(df_manufacturers['Manufacturer_ID'], size=100, replace=True)
})

# 5: Combine initial pairings with additional pairings
final_pairings = pd.concat([initial_pairings, additional_pairings]).drop_duplicates()

# 6: Generate Contract IDs
final_pairings['Supply_Contract_ID'] = [f'SC{i}' for i in range(1, len(final_pairings) + 1)]

# 7: Add Contract Start Dates
final_pairings['Contract_Start_Date'] = pd.date_range(start='2024-01-01', periods=len(final_pairings), freq='D')

# 8: Randomly add a Contract_End_Date between 6 months and 6 years (in 6-month intervals)
# Create a list of possible durations (in months) between 6 months and 6 years
possible_durations = np.arange(6, 73, 6)  # From 6 months to 72 months (6 years)

# 9: Assign a random duration for each contract and use pd.DateOffset to add months
final_pairings['Contract_End_Date'] = final_pairings.apply(
    lambda row: row['Contract_Start_Date'] + pd.DateOffset(months=np.random.choice(possible_durations)), axis=1
)

# 10: Reset the index and display the resulting DataFrame
final_pairings.reset_index(drop=True, inplace=True)
df_final = final_pairings.sample(frac=1, random_state=42).reset_index(drop=True)


In [89]:
# unique supplier contract IDs
df_final.Supply_Contract_ID.nunique(), df_final.shape[0]

(408, 408)

In [101]:
contracts_data = []

for i in range(df_final.shape[0]):
    contract = {}
    contract['Supply_Contract_ID'] = df_final.iloc[i]['Supply_Contract_ID']
    contract['Supplier_ID'] = df_final.iloc[i]['Supplier_ID']   
    contract['Manufacturer_ID'] = df_final.iloc[i]['Manufacturer_ID']   
    contract['Contract_Start_Date'] = df_final.iloc[i]['Contract_Start_Date'].strftime('%Y-%m-%d')  
    contract['Contract_End_Date'] = df_final.iloc[i]['Contract_End_Date'].strftime('%Y-%m-%d')   
    contracts_data.append(contract)
    

In [103]:
contracts_data[0]

{'Supply_Contract_ID': 'SC71',
 'Supplier_ID': 'S24',
 'Manufacturer_ID': 'M25',
 'Contract_Start_Date': '2024-03-11',
 'Contract_End_Date': '2028-09-11'}

In [107]:
from confluent_kafka import Producer
import json

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for contract in contracts_data:
    producer.produce('supplier_manufacturer_contract', key=contract['Supply_Contract_ID'], value=json.dumps(contract))

producer.flush()


0

### Create Stream Supplied Material

In [236]:
# Define list of materials and possible durations
materials = [
    'Wood', 'Steel', 'Plastic', 'Glass', 'Aluminum', 'Copper', 'Brass', 'Concrete', 'Rubber', 'Ceramic',
    'Titanium', 'Carbon Fiber', 'Fiberglass', 'Leather', 'Silicone', 'Polycarbonate', 'Acrylic', 'Nylon',
    'Polyester', 'PVC', 'Teflon', 'Kevlar', 'Graphene', 'Bamboo', 'Marble', 'Granite', 'Quartz', 'Slate'
]
possible_durations = np.arange(1, 12, 1)  # From 1 to 11 months
delivery_days_options = [1, 3, 5, 7, 9, 13, 15]  # Random delivery times

all_contracts = df_final['Supply_Contract_ID'].unique()

materials_data = []

for i in range(len(all_contracts) * 3):
    material = {}
    material['Supplied_Material_ID'] = f'SM{i}'
    selected_contract = np.random.choice(all_contracts)
    material['Supply_Contract_ID'] = selected_contract
    material['Material_Name'] = np.random.choice(materials)
    material['Material_Price'] = round(np.random.uniform(100, 10000), 2)
    material['Material_Quantity'] = np.random.randint(100, 1000)
    contract_start_date = df_final.loc[df_final['Supply_Contract_ID'] == selected_contract, 'Contract_Start_Date'].values[0]
    requested_date = pd.to_datetime(contract_start_date) + pd.DateOffset(months=np.random.choice(possible_durations))
    material['Requested_Date'] = requested_date.strftime('%Y-%m-%d')
    delivery_date = requested_date + pd.to_timedelta(np.random.choice(delivery_days_options), unit='D')
    material['Delivery_Date'] = delivery_date.strftime('%Y-%m-%d')
    material['Delivery_Status'] = np.random.choice(['Pending', 'In Progress', 'Delivered'])
    materials_data.append(material)



In [242]:
df_materials = pd.DataFrame(materials_data)
df_materials

Unnamed: 0,Supplied_Material_ID,Supply_Contract_ID,Material_Name,Material_Price,Material_Quantity,Requested_Date,Delivery_Date,Delivery_Status
0,SM0,SC78,Rubber,5026.30,963,2024-05-18,2024-05-27,In Progress
1,SM1,SC11,Polycarbonate,6984.03,723,2024-11-11,2024-11-18,In Progress
2,SM2,SC291,Graphene,9482.59,370,2025-02-17,2025-02-24,Pending
3,SM3,SC365,Glass,7858.07,814,2025-06-30,2025-07-01,Delivered
4,SM4,SC222,Steel,9411.12,741,2024-12-09,2024-12-22,Pending
...,...,...,...,...,...,...,...,...
1219,SM1219,SC342,Acrylic,5113.57,530,2025-10-07,2025-10-08,In Progress
1220,SM1220,SC383,Quartz,6895.61,211,2025-08-17,2025-08-22,Delivered
1221,SM1221,SC156,Kevlar,7072.20,324,2025-03-04,2025-03-09,Delivered
1222,SM1222,SC39,PVC,773.11,390,2024-09-08,2024-09-11,Delivered


In [244]:
df_materials[df_materials['Supply_Contract_ID'] == 'SC1']

Unnamed: 0,Supplied_Material_ID,Supply_Contract_ID,Material_Name,Material_Price,Material_Quantity,Requested_Date,Delivery_Date,Delivery_Status
131,SM131,SC1,Marble,2803.46,593,2024-08-01,2024-08-10,Delivered
700,SM700,SC1,Aluminum,3788.59,437,2024-04-01,2024-04-04,Pending
733,SM733,SC1,Nylon,4142.56,893,2024-12-01,2024-12-02,Delivered
998,SM998,SC1,Slate,6769.59,250,2024-10-01,2024-10-16,In Progress
1047,SM1047,SC1,Polyester,549.83,698,2024-06-01,2024-06-04,Pending
1160,SM1160,SC1,Granite,3328.61,996,2024-05-01,2024-05-16,In Progress
1167,SM1167,SC1,PVC,6540.38,524,2024-06-01,2024-06-02,Delivered


%3|1729796319.962|FAIL|rdkafka#producer-9| [thrd:localhost:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 13ms in state CONNECT)


In [243]:
from confluent_kafka import Producer
import json

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for material in materials_data:
    producer.produce('supplied_material_status', key=material['Supplied_Material_ID'], value=json.dumps(material))

producer.flush()


0

## Create Distributors DB

In [6]:
distributors_data = []
distributor = {}

for i in range(30):
    distributor = {}
    distributor['DistributorID'] = f'D{i}'
    distributor['DistributorName'] = faker.company() + ' ' + faker.company_suffix()
    distributor['City'] = faker.city()
    distributor['State'] = faker.state()
    distributor['ZipCode'] = faker.zipcode()
    distributor['Country'] = faker.country()
    distributor['DistributionArea'] = str(np.random.randint(1,100))+'km.sq'
    distributors_data.append(distributor)
    

In [7]:
df_distributors = pd.DataFrame.from_dict(distributors_data)
df_distributors

Unnamed: 0,DistributorID,DistributorName,City,State,ZipCode,Country,DistributionArea
0,D0,"Lynch, Ali and Hall Group",Ayalaport,Arizona,83455,Cote d'Ivoire,80km.sq
1,D1,Vaughan Inc LLC,North Jefferyport,Nebraska,35895,Pakistan,66km.sq
2,D2,Freeman-Bentley Inc,East Meganfurt,West Virginia,81191,Bangladesh,18km.sq
3,D3,Stephens-Andersen and Sons,North Shelleystad,Delaware,77662,Burkina Faso,53km.sq
4,D4,Jackson and Sons PLC,Justinbury,New Jersey,67481,Lao People's Democratic Republic,62km.sq
5,D5,Gordon PLC and Sons,North Tylerland,Illinois,46961,Georgia,42km.sq
6,D6,Flores and Sons Ltd,Rebeccabury,South Dakota,71194,Malawi,24km.sq
7,D7,Salazar-Ramirez PLC,South Melanieton,Arkansas,97575,Mexico,76km.sq
8,D8,"Rice, Fisher and Jenkins PLC",Devinview,Texas,7165,Martinique,63km.sq
9,D9,"Smith, Kaufman and Villa Group",Roberttown,Mississippi,43133,Tokelau,58km.sq


In [19]:
from confluent_kafka import Producer
import json

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for distributor in distributors_data:
    producer.produce('distributors', key=distributor['DistributorID'], value=json.dumps(distributor))

producer.flush()


0

## Create Retailers DB

In [2]:
import pandas as pd

In [3]:
df_lat_long = pd.read_csv('./DataCoSupplyChainDataset.csv', usecols=['Latitude', 'Longitude', 'Customer City', 'Customer State', 'Customer Zipcode', 'Customer Country'], encoding='ISO-8859-1')

In [4]:
df_lat_long.shape


(180519, 6)

In [18]:
df_lat_long.Latitude.nunique(), df_lat_long.Longitude.nunique()

(11250, 4487)

In [49]:
# Step 1: Group by 'longitude' and 'latitude', and count occurrences
grouped = df_lat_long.groupby(['Longitude', 'Latitude']).size().reset_index(name='count')

# Step 2: Find the most frequent latitude for each longitude
most_frequent = grouped.loc[grouped.groupby('Longitude')['count'].idxmax()]

# Step 3: Drop the 'count' column since it's no longer needed
most_frequent = most_frequent[['Longitude', 'Latitude']]




In [72]:
df_lat_long.groupby(['Longitude', 'Latitude']).first()

Unnamed: 0_level_0,Unnamed: 1_level_0,Customer City,Customer Country,Customer State,Customer Zipcode
Longitude,Latitude,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
-158.025986,21.337139,Ewa Beach,EE. UU.,HI,96706.0
-158.016037,21.447960,Mililani,EE. UU.,HI,96789.0
-158.010223,21.453259,Mililani,EE. UU.,HI,96789.0
-158.004761,21.388741,Waipahu,EE. UU.,HI,96797.0
-158.004684,21.311855,Ewa Beach,EE. UU.,HI,96706.0
...,...,...,...,...,...
77.092094,38.911991,Washington,EE. UU.,DC,20019.0
82.461998,28.007509,Tampa,EE. UU.,FL,33614.0
84.742676,33.728558,Douglasville,EE. UU.,GA,30135.0
115.049789,36.163929,Las Vegas,EE. UU.,NV,89119.0


In [67]:
grouped

Unnamed: 0,Longitude,Latitude,count
0,-158.025986,21.337139,73
1,-158.016037,21.447960,134
2,-158.010223,21.453259,18
3,-158.004761,21.388741,7
4,-158.004684,21.311855,43
...,...,...,...
11830,77.092094,38.911991,32
11831,82.461998,28.007509,22
11832,84.742676,33.728558,8
11833,115.049789,36.163929,17


In [51]:
most_frequent.reset_index(drop=True, inplace=True)

In [73]:
# Merge the dataframes on Latitude and Longitude
merged_df = pd.merge(most_frequent, df_lat_long, on=['Latitude', 'Longitude'], how='inner')


In [84]:
merged_df.drop_duplicates(subset=['Longitude','Latitude'],keep='first',ignore_index=True, inplace=True)

In [85]:
merged_df

Unnamed: 0,Longitude,Latitude,Customer City,Customer Country,Customer State,Customer Zipcode
0,-158.025986,21.337139,Ewa Beach,EE. UU.,HI,96706.0
1,-158.016037,21.447960,Mililani,EE. UU.,HI,96789.0
2,-158.010223,21.453259,Mililani,EE. UU.,HI,96789.0
3,-158.004761,21.388741,Waipahu,EE. UU.,HI,96797.0
4,-158.004684,21.311855,Ewa Beach,EE. UU.,HI,96706.0
...,...,...,...,...,...,...
4482,77.092094,38.911991,Washington,EE. UU.,DC,20019.0
4483,82.461998,28.007509,Tampa,EE. UU.,FL,33614.0
4484,84.742676,33.728558,Douglasville,EE. UU.,GA,30135.0
4485,115.049789,36.163929,Las Vegas,EE. UU.,NV,89119.0


In [86]:
retailers_data = []
retailer = {}

for i in range(merged_df.shape[0]): # 4487
    retailer = {}
    retailer['RetailerID'] = f'R{i}'
    retailer['RetailerName'] = faker.company() + ' ' + faker.company_suffix()
    retailer['Latitude'] = np.round(merged_df.iloc[i,1 ],3)
    retailer['Longitude'] = np.round(merged_df.iloc[i,0 ],3)
    retailer['City'] = merged_df.iloc[i,2 ]
    retailer['State'] = merged_df.iloc[i,4 ]
    retailer['ZipCode'] = merged_df.iloc[i,5 ]
    retailer['Country'] = merged_df.iloc[i,3 ]
    retailers_data.append(retailer)

In [88]:
from confluent_kafka import Producer
import json

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for retailer in retailers_data:
    producer.produce('retailers', key=retailer['RetailerID'], value=json.dumps(retailer))

producer.flush()


0

%3|1729532432.694|FAIL|rdkafka#producer-2| [thrd:localhost:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 24ms in state CONNECT)


## Create Customers DB

In [259]:
df_customer = pd.read_csv('./DataCoSupplyChainDataset.csv', usecols=['Customer Fname', 'Customer Lname', 'Customer Id', 'Customer City', 'Customer State', 'Customer Zipcode','Customer Country', 'Customer Segment'], encoding='ISO-8859-1')

In [260]:
df_customer_new = df_customer.groupby(['Customer Id']).last().copy(deep=True)

In [261]:
df_customer_new['LoyalityPoints'] = df_customer.groupby('Customer Id').size()


In [262]:
df_customer_new.dtypes

Customer City        object
Customer Country     object
Customer Fname       object
Customer Lname       object
Customer Segment     object
Customer State       object
Customer Zipcode    float64
LoyalityPoints        int64
dtype: object

In [26]:
customers_data = []
customer = {}

for i in range(df_customer_new.shape[0]):
    customer = {}
    customer['CustomerID'] = f'C{df_customer_new.index[i]}'
    customer['FirstName'] = df_customer_new.iloc[i,2]
    customer['LastName'] = df_customer_new.iloc[i,3]
    customer['City'] = df_customer_new.iloc[i,0]
    customer['State'] = df_customer_new.iloc[i,5]
    customer['ZipCode'] = str(df_customer_new.iloc[i,6])
    customer['Country'] = df_customer_new.iloc[i,1]
    customer['LoyaltyPoints'] = int(df_customer_new.iloc[i,7])
    customer['CustomerSegment'] = df_customer_new.iloc[i,4]
    
    customers_data.append(customer)

In [27]:
from confluent_kafka import Producer
import json

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for customer in customers_data:
    producer.produce('customers', key=customer['CustomerID'], value=json.dumps(customer))

producer.flush()


0

In [141]:
df_customer_new[df_customer_new['Customer Zipcode'].isna()]

Unnamed: 0_level_0,Customer City,Customer Country,Customer Fname,Customer Lname,Customer Segment,Customer State,Customer Zipcode,LoyalityPoints
Customer Id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
14046,CA,EE. UU.,Zena,Elliott,Consumer,91732,,1
14577,CA,EE. UU.,Sara,Foreman,Consumer,95758,,1
17171,CA,EE. UU.,Eugenia,Slater,Corporate,95758,,1


## Create Products

In [11]:
df_product = pd.read_csv('./DataCoSupplyChainDataset.csv', usecols=['Product Card Id', 'Product Name', 'Department Name', 'Category Name', 'Product Price'], encoding='ISO-8859-1')

In [12]:
# each product have the same price in the given dataset
df_product.groupby(['Product Price','Product Card Id']).count().shape 

(118, 3)

In [13]:
df_product_byID = df_product.groupby(['Product Card Id']).first()

In [14]:
# Create manufacturers and assign to the products DataFrame
manufacturers = [f'M{i}' for i in range(30)] 
num_products = 118
products = pd.DataFrame({'product_id': [f'P{i}' for i in range(num_products)]})

# Shuffle products and assign manufacturers ensuring each gets at least one
np.random.shuffle(products.values)  # Shuffle rows
assignments = [manufacturers[i % len(manufacturers)] for i in range(num_products)]

# Assign and randomize manufacturers in the DataFrame
np.random.shuffle(assignments)

df_product_byID['ManufacturerID'] = assignments


In [15]:
df_product_byID['WarrantyPeriod'] = [str(f'{np.random.choice([2,3,6,12,18])} - month') for _ in range(df_product_byID.shape[0])]

In [16]:
df_product_byID.head()

Unnamed: 0_level_0,Category Name,Department Name,Product Name,Product Price,ManufacturerID,WarrantyPeriod
Product Card Id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
19,Soccer,Fitness,Nike Men's Fingertrap Max Training Shoe,124.989998,M7,3 - month
24,Soccer,Fitness,Elevation Training Mask 2.0,79.989998,M23,6 - month
35,Baseball & Softball,Fitness,adidas Brazuca 2014 Official Match Ball,159.990005,M3,12 - month
37,Baseball & Softball,Fitness,adidas Kids' F5 Messi FG Soccer Cleat,34.990002,M0,2 - month
44,Baseball & Softball,Fitness,adidas Men's F10 Messi TRX FG Soccer Cleat,59.990002,M2,12 - month


In [28]:
products_data = []
product = {}

for i in range(df_product_byID.shape[0]):
    product = {}
    product['ProductID'] = f'P{df_product_byID.index[i]}'
    product['ProductName'] = df_product_byID.iloc[i,2]
    product['ProductDepartment'] = df_product_byID.iloc[i,1]
    product['ProductCategory'] = df_product_byID.iloc[i,0]
    product['UnitPrice'] = np.round(df_product_byID.iloc[i,3],2)
    product['ManufacturerID'] = str(df_product_byID.iloc[i,4])
    product['WarrantyPeriod'] = df_product_byID.iloc[i,5]
    product['ProductStatus'] = ''
    
    products_data.append(product)

In [29]:
conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

for product in products_data:
    producer.produce('products', key=product['ProductID'], value=json.dumps(product))

producer.flush()


0

# Streams
## Order Stream

In [245]:
df_all_orders = pd.read_csv('./DataCoSupplyChainDataset.csv', encoding='ISO-8859-1')

In [320]:
## ingest in batches
start = 0
end = 1000
# Assuming df_all_orders is your DataFrame containing the order data
def create_order_dict(row):
    return {
        'OrderID': 'O' + str(row['Order Item Id']),
        'CustomerID': 'C' + str(row['Customer Id']),
        'OrderDate': row['order date (DateOrders)'],
        'TotalAmount': round(row['Sales per customer'], 2),
        'OrderStatus': row['Order Status'],
        'PaymentMethod': row['Type'],
        'OrderItemDiscount': np.round(row['Order Item Discount'],2),
        'OrderItemDiscountRate': np.round(row['Order Item Discount Rate'],2),
        'OrderItemTotal': np.round(row['Order Item Total'],2),
        'OrderProfitPerOrder': np.round(row['Order Profit Per Order'],2),
        'OrderQuantity': int(row['Order Item Quantity']),
        'OrderCity': row['Order City'],
        'OrderState': row['Order State'],
        'OrderCountry': row['Order Country'],
        'OrderRegion': row['Order Region'],
        'ShippingID': '',
        'RetailerID': ''
    }

# Create the orders_data list using apply
orders_data = df_all_orders.iloc[start:end].apply(create_order_dict, axis=1).tolist()

conf = {'bootstrap.servers': 'localhost:29092'}
producer = Producer(conf)

# Send orders_data to Kafka in batches
for order in orders_data:
    producer.produce('order_info', key=order['OrderID'], value=json.dumps(order))

# Flush the producer to ensure all messages are sent
producer.flush()


0

### Inventory Stream