In [63]:

import importlib
def reload_specific_modules():
    # List of modules to reloadlogisticslogistics
    modules_to_reload = [
        'etl.extract',
        'etl.transform',
        'connections.sql_connector',
        'utils.sql_data_access'
    ]
    
    for module_name in modules_to_reload:
        if module_name in sys.modules:
            module = sys.modules[module_name]
            importlib.reload(module)
        else:
            print(f"Module {module_name} is not imported.")
            
from etl.extract import extract_data
from etl.transform import *
from etl.load import load_tracker_data, load_receiver_data, load_address_data
from connections.sql_connector import get_mysql_engine
from utils.sql_data_access import get_ids_by_mongo_ids, get_address_id_and_type_by_mongo_ids
reload_specific_modules()


Connected to MongoDB


In [64]:
engine = get_mysql_engine()

In [65]:
import pymongo
client = get_mongo_client()
db = client['logisticstics']
print(db.list_collection_names())

countryCollection = db['country']
zoneCollection = db['zone']
starCollection = db['star']
cityCollection = db['city']
receiverCollection = db['receiver']
trackerCollection = db['tracker']
orderCollection_ = db['order']


Connected to MongoDB
['country', 'city', 'receiver', 'zone', 'order', 'star', 'tracker']


In [66]:
df = pd.json_normalize(orderCollection_.find())

In [67]:
# Extract relevant fields and rename them to match SQL model
df = df.rename(
columns={
    "_id": "mongo_id",
    "orderId": "order_number",
    "type": "type",
    "createdAt": "created_at",
    "updatedAt": "updated_at",
    "receiver": "receiver_mongo_id",
    "star": "star_mongo_id",
    "dropOffAddress.city": "dropoff_city_mongo_id",
    "pickupAddress.city": "pickup_city_mongo_id",
}
)

In [68]:
address_id_mapping = get_address_id_and_type_by_mongo_ids(list(df["mongo_id"].astype(str)))
address_id_mapping

{'67a210bf6261b52121ab3740': [(11, 'dropoff'), (1, 'pickup')],
 '67a210bf6261b52121ab3746': [(12, 'dropoff'), (2, 'pickup')],
 '67a210bf6261b52121ab374b': [(13, 'dropoff'), (3, 'pickup')],
 '67a210bf6261b52121ab3751': [(14, 'dropoff'), (4, 'pickup')],
 '67a210bf6261b52121ab3756': [(15, 'dropoff'), (5, 'pickup')],
 '67a210bf6261b52121ab375a': [(16, 'dropoff'), (6, 'pickup')],
 '67a210bf6261b52121ab3760': [(17, 'dropoff'), (7, 'pickup')],
 '67a210c06261b52121ab3766': [(18, 'dropoff'), (8, 'pickup')],
 '67a210c06261b52121ab376b': [(19, 'dropoff'), (9, 'pickup')],
 '67a210c06261b52121ab3771': [(20, 'dropoff'), (10, 'pickup')]}

In [69]:
pickup_address_id_mapping = {}
dropoff_address_id_mapping = {}

# Populate the mappings based on the type
for address_id, address_data in address_id_mapping.items():
    for id_num, address_type in address_data:
        if address_type == "pickup":
            pickup_address_id_mapping[address_id] = id_num
        elif address_type == "dropoff":
            dropoff_address_id_mapping[address_id] = id_num
        else:
            print("Unknown address type:", address_type)

In [86]:
orderCollection_ = orderCollection.find()
confirmation_data = []
for order in orderCollection_:
    confirmation = order.get("confirmation", {})
    confirmation_data.append(
        {
            "order_mongo_id": order["_id"],
            "is_confirmed": confirmation.get("isConfirmed", False),
            "number_of_sms_trials": confirmation.get("numberOfSmsTrials", 0),
        }
    )

df = pd.DataFrame(confirmation_data)
df = convert_id_columns_to_string(df)
order_id_mapping = get_ids_by_mongo_ids("orders", list(df["order_mongo_id"]))
print(order_id_mapping)
df["order_id"] = df["order_mongo_id"].map(order_id_mapping)
df

{'67a210bf6261b52121ab3740': 1, '67a210bf6261b52121ab3746': 2, '67a210bf6261b52121ab374b': 3, '67a210bf6261b52121ab3751': 4, '67a210bf6261b52121ab3756': 5, '67a210bf6261b52121ab375a': 6, '67a210bf6261b52121ab3760': 7, '67a210c06261b52121ab3766': 8, '67a210c06261b52121ab376b': 9, '67a210c06261b52121ab3771': 10}


Unnamed: 0,order_mongo_id,is_confirmed,number_of_sms_trials,order_id
0,67a210bf6261b52121ab3740,True,0.0,1
1,67a210bf6261b52121ab3746,False,0.0,2
2,67a210bf6261b52121ab374b,False,0.0,3
3,67a210bf6261b52121ab3751,False,0.0,4
4,67a210bf6261b52121ab3756,False,0.0,5
5,67a210bf6261b52121ab375a,False,0.0,6
6,67a210bf6261b52121ab3760,False,0.0,7
7,67a210c06261b52121ab3766,False,0.0,8
8,67a210c06261b52121ab376b,False,0.0,9
9,67a210c06261b52121ab3771,False,0.0,10


In [71]:
order_id_mapping = get_ids_by_mongo_ids(
        "orders", list(df["order_mongo_id"])
    )
order_id_mapping

{'67a210bf6261b52121ab3740': 1,
 '67a210bf6261b52121ab3746': 2,
 '67a210bf6261b52121ab374b': 3,
 '67a210bf6261b52121ab3751': 4,
 '67a210bf6261b52121ab3756': 5,
 '67a210bf6261b52121ab375a': 6,
 '67a210bf6261b52121ab3760': 7,
 '67a210c06261b52121ab3766': 8,
 '67a210c06261b52121ab376b': 9,
 '67a210c06261b52121ab3771': 10}

In [72]:

# Convert ID columns to string type
df = df.astype({col: str for col in df.columns if '_id' in col.lower()})

In [73]:
receiver_id_mapping = get_ids_by_mongo_ids(
        "receivers", list(df["receiver_mongo_id"])
    )
star_id_mapping = get_ids_by_mongo_ids("stars", list(df["star_mongo_id"]))

print("receiver_id_mapping")
print(receiver_id_mapping)
print("star_id_mapping")
print(star_id_mapping)

receiver_id_mapping
{'67a210bf6261b52121ab373d': 1, '67a210bf6261b52121ab3743': 2, '67a210bf6261b52121ab3748': 3, '67a210bf6261b52121ab374e': 4, '67a210bf6261b52121ab3753': 5, '67a210bf6261b52121ab3757': 6, '67a210bf6261b52121ab375d': 7, '67a210bf6261b52121ab3763': 8, '67a210c06261b52121ab3768': 9, '67a210c06261b52121ab376e': 10}
star_id_mapping
{'67a210bf6261b52121ab373e': 1, '67a210bf6261b52121ab3744': 2, '67a210bf6261b52121ab3749': 3, '67a210bf6261b52121ab374f': 4, '67a210bf6261b52121ab3754': 5, '67a210bf6261b52121ab3758': 6, '67a210bf6261b52121ab375e': 7, '67a210bf6261b52121ab3764': 8, '67a210c06261b52121ab3769': 9, '67a210c06261b52121ab376f': 10}


In [74]:
df["receiver_id"] = df["receiver_mongo_id"].map(receiver_id_mapping)
df["receiver_id"]

0     1
1     2
2     3
3     4
4     5
5     6
6     7
7     8
8     9
9    10
Name: receiver_id, dtype: int64

In [75]:
df["star_id"] = df["star_mongo_id"].map(star_id_mapping)
df["star_id"]

0     1
1     2
2     3
3     4
4     5
5     6
6     7
7     8
8     9
9    10
Name: star_id, dtype: int64

In [76]:
df["dropoff_address_id"] = df["mongo_id"].map(dropoff_address_id_mapping)
df["pickup_address_id"] = df["mongo_id"].map(pickup_address_id_mapping)

In [77]:
df[
    [
        "mongo_id",
        "order_number",
        "type",
        "pickup_address_id",
        "dropoff_address_id",
        "receiver_id",
        "star_id",
        "created_at",
        "updated_at",
    ]
]

Unnamed: 0,mongo_id,order_number,type,pickup_address_id,dropoff_address_id,receiver_id,star_id,created_at,updated_at
0,67a210bf6261b52121ab3740,4233895-0,SEND,1,11,1,1,2025-02-04 15:06:07.760,2025-02-04 13:06:07.760
1,67a210bf6261b52121ab3746,4233895-1,SEND,2,12,2,2,2025-02-04 15:06:07.820,2025-02-04 13:06:07.821
2,67a210bf6261b52121ab374b,4233895-2,SEND,3,13,3,3,2025-02-04 15:06:07.849,2025-02-04 13:06:07.850
3,67a210bf6261b52121ab3751,4233895-3,SEND,4,14,4,4,2025-02-04 15:06:07.880,2025-02-04 13:06:07.882
4,67a210bf6261b52121ab3756,4233895-4,SEND,5,15,5,5,2025-02-04 15:06:07.908,2025-02-04 13:06:07.909
5,67a210bf6261b52121ab375a,4233895-5,SEND,6,16,6,6,2025-02-04 15:06:07.936,2025-02-04 13:06:07.937
6,67a210bf6261b52121ab3760,4233895-6,SEND,7,17,7,7,2025-02-04 15:06:07.967,2025-02-04 13:06:07.968
7,67a210c06261b52121ab3766,4233895-7,SEND,8,18,8,8,2025-02-04 15:06:08.000,2025-02-04 13:06:08.000
8,67a210c06261b52121ab376b,4233895-8,SEND,9,19,9,9,2025-02-04 15:06:08.052,2025-02-04 13:06:08.053
9,67a210c06261b52121ab3771,4233895-9,SEND,10,20,10,10,2025-02-04 15:06:08.093,2025-02-04 13:06:08.094


In [78]:
pd.json_normalize(countryCollection.find({'updatedAt':{'$gt':LAST_UPDATED}}))

Unnamed: 0,_id,name,code,createdAt,updatedAt
0,67a210bf6261b52121ab373a,Egypt,EG,2025-02-04 13:06:07.390,2025-02-04 13:06:07.390


In [79]:
from etl.extract import extract_data
from etl.transform import *
from etl.load import load_tracker_data, load_receiver_data, load_address_data
from connections.sql_connector import get_mysql_engine
from utils.sql_data_access import get_ids_by_mongo_ids, get_address_id_and_type_by_mongo_ids
reload_specific_modules()

engine = get_mysql_engine()


for bulk in extract_data("order"):
    df = transform_order_data(bulk)
df

Connected to MongoDB
No more records for order


Unnamed: 0,mongo_id,order_number,type,pickup_address_id,dropoff_address_id,receiver_id,star_id,created_at,updated_at
0,67a210bf6261b52121ab3740,4233895-0,SEND,1,11,1,1,2025-02-04 15:06:07.760,2025-02-04 13:06:07.760
1,67a210bf6261b52121ab3746,4233895-1,SEND,2,12,2,2,2025-02-04 15:06:07.820,2025-02-04 13:06:07.821
2,67a210bf6261b52121ab374b,4233895-2,SEND,3,13,3,3,2025-02-04 15:06:07.849,2025-02-04 13:06:07.850
3,67a210bf6261b52121ab3751,4233895-3,SEND,4,14,4,4,2025-02-04 15:06:07.880,2025-02-04 13:06:07.882
4,67a210bf6261b52121ab3756,4233895-4,SEND,5,15,5,5,2025-02-04 15:06:07.908,2025-02-04 13:06:07.909
5,67a210bf6261b52121ab375a,4233895-5,SEND,6,16,6,6,2025-02-04 15:06:07.936,2025-02-04 13:06:07.937
6,67a210bf6261b52121ab3760,4233895-6,SEND,7,17,7,7,2025-02-04 15:06:07.967,2025-02-04 13:06:07.968
7,67a210c06261b52121ab3766,4233895-7,SEND,8,18,8,8,2025-02-04 15:06:08.000,2025-02-04 13:06:08.000
8,67a210c06261b52121ab376b,4233895-8,SEND,9,19,9,9,2025-02-04 15:06:08.052,2025-02-04 13:06:08.053
9,67a210c06261b52121ab3771,4233895-9,SEND,10,20,10,10,2025-02-04 15:06:08.093,2025-02-04 13:06:08.094


In [82]:
df.columns

Index(['order_mongo_id', 'first_line', 'second_line', 'district', 'floor',
       'apartment', 'geo_location', 'zone_id', 'city_id', 'country_id',
       'created_at', 'updated_at'],
      dtype='object')

In [64]:
countryDataFrame = pd.json_normalize(countryCollection.find())
zoneDataFrame = pd.json_normalize(zoneCollection.find())
starDataFrame = pd.json_normalize(starCollection.find())
cityDataFrame = pd.json_normalize(cityCollection.find())
receiverDataFrame = pd.json_normalize(receiverCollection.find())
trackerDataFrame = pd.json_normalize(trackerCollection.find())
orderDataFrame = pd.json_normalize(orderCollection_.find())

NameError: name 'countryCollection' is not defined

In [11]:
countryDataFrame

Unnamed: 0,_id,name,code,createdAt,updatedAt
0,67a210bf6261b52121ab373a,Egypt,EG,2025-02-04 13:06:07.390,2025-02-04 13:06:07.390
