In [1]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pymongo import MongoClient
import os

##### Source Folders

In [2]:
root_path = os.getenv('HOST_PATH')
users_csv = f"{root_path}/data/files/treino"
output_parquet = f"{root_path}/artifacts/parquets/users_merged_data.parquet"
mongo_connection = os.getenv("MONGODB_CON", "mongodb://admin:adminpassword@localhost:27018")

##### Mergeando dados de treino e gerando Parquet

In [3]:
csv_files = [f for f in os.listdir(users_csv) if f.endswith('.csv')]
dfs = []

for csv_file in csv_files:
    csv_path = os.path.join(users_csv, csv_file)
    try:
        df = pd.read_csv(csv_path)
        df['history'] = df['history'].apply(
                lambda hist: hist.replace('\n', ' ')
                    .replace("'", ' ')
                    .replace("[", ' ')
                    .replace("]", ' ')
                    .strip() if isinstance(hist, str) else hist
                )
        dfs.append(df)
    except Exception as e:
        print(f"❌ Error reading {csv_file}: {e}")
        
all_data = pd.concat(dfs, ignore_index=True)
table = pa.Table.from_pandas(all_data)
pq.write_table(table, output_parquet)

print(f"✅ Merged {len(csv_files)} CSV files into {output_parquet}")
all_data.head()

✅ Merged 6 CSV files into ../../shared-files/artifacts/parquets/users_merged_data.parquet


Unnamed: 0,userId,userType,historySize,history,timestampHistory,numberOfClicksHistory,timeOnPageHistory,scrollPercentageHistory,pageVisitsCountHistory,timestampHistory_new
0,fbb963d61eb8149e7f43b1bd905457ba5e106a830ddc27...,Non-Logged,2,"80aa7bb2-adce-4a55-9711-912c407927a1, d9e5f15d...","1657908085200, 1659634203762","0, 0","71998, 115232","81.58, 73.36","1, 1","1657908085200, 1659634203762"
1,17f1083e6079b0f28f7820a6803583d1c1b405c0718b11...,Non-Logged,2,"19ba89fc-1e06-4c5d-9c57-4a3088dc0511, e273dba4...","1657111508570, 1657481309920","68, 12","131495, 43733","51.74, 35.49","1, 1","1657111508570, 1657481309920"
2,528a8d7a2af73101da8d6709c1ec875b449a5a58749a99...,Non-Logged,2,"59a61a8a-cc52-453f-b1cd-2bd019e9d574, a0562805...","1657823890328, 1660141444328","55, 9","159042, 10336","62.19, 48.28","1, 1","1657823890328, 1660141444328"
3,2dd18b58a634a4e77181a202cf152df6169dfb3e4230ef...,Non-Logged,2,"233f8238-2ce0-470f-a9d5-0e0ac530382a, 037155f4...","1656963373076, 1657091888917","0, 0","193579, 20519","31.03, 31.9","1, 1","1656963373076, 1657091888917"
4,97e1439d485b0630e12818d3df84ff67d08475ef6ebeb0...,Logged,2,"385044ad-3876-4188-83fa-f560435c1a9c, 2f754502...","1657618607633, 1659536839832","57, 38","220000, 130000","52.65, 53.37","1, 1","1657618607633, 1659536839832"


Conectando ao mongoDB

In [4]:
client = MongoClient(mongo_connection)
db = client["mydatabase"]
collection = db["users"]


In [5]:
df = pd.read_parquet(output_parquet)
dados_para_inserir = df.to_dict(orient="records")

##### Verifica se o dado já existe no mongo, baseado no userId, para inserir o registro

In [6]:
if dados_para_inserir:
    for dado in dados_para_inserir:
        dado["_id"] = dado["userId"]  # Define userId como chave primária
        collection.update_one(
            {"_id": dado["_id"]},  # Verifica se já existe um documento com esse userId
            {"$set": dado},        # Atualiza os dados se já existir
            upsert=True            # Insere caso não exista
        )
    print(f"✅ {len(dados_para_inserir)} documentos inseridos com sucesso no MongoDB!")
else:
    print("❌ Nenhum dado para inserir.")

✅ 577942 documentos inseridos com sucesso no MongoDB!


In [7]:
for doc in collection.find().limit(5):  # Mostrar apenas os primeiros 5 registros
  print(doc)

{'_id': 'fbb963d61eb8149e7f43b1bd905457ba5e106a830ddc27288434101e7252ef57', 'history': '80aa7bb2-adce-4a55-9711-912c407927a1, d9e5f15d-b441-4d8b-bee4-462b106d3916', 'historySize': 2, 'numberOfClicksHistory': '0, 0', 'pageVisitsCountHistory': '1, 1', 'scrollPercentageHistory': '81.58, 73.36', 'timeOnPageHistory': '71998, 115232', 'timestampHistory': '1657908085200, 1659634203762', 'timestampHistory_new': '1657908085200, 1659634203762', 'userId': 'fbb963d61eb8149e7f43b1bd905457ba5e106a830ddc27288434101e7252ef57', 'userType': 'Non-Logged'}
{'_id': '17f1083e6079b0f28f7820a6803583d1c1b405c0718b11a18d30b1620f643b23', 'history': '19ba89fc-1e06-4c5d-9c57-4a3088dc0511, e273dba4-136c-45fb-bdd6-0cc57b13aaf0', 'historySize': 2, 'numberOfClicksHistory': '68, 12', 'pageVisitsCountHistory': '1, 1', 'scrollPercentageHistory': '51.74, 35.49', 'timeOnPageHistory': '131495, 43733', 'timestampHistory': '1657111508570, 1657481309920', 'timestampHistory_new': '1657111508570, 1657481309920', 'userId': '17f10