In [1]:
import os
import logging
import shutil 
from dotenv import load_dotenv
from zoneinfo import ZoneInfo
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


load_dotenv()
MONGO_DB = os.getenv("MONGO_DB")
MONGO_URI = os.getenv("MONGO_URI")
DBKS_TOKEN = os.getenv("DBKS_TOKEN")

jar_path = './mongo-spark-connector_2.12-3.0.2-assembly.jar'

logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

spark = SparkSession.builder \
    .appName("MongoDB_Spark_Connection") \
    .config("spark.mongodb.input.uri", MONGO_URI) \
    .config("spark.jars", jar_path) \
    .getOrCreate()

logging.info(f"Connection succesfully established")


your 131072x1 screen size is bogus. expect trouble
24/10/04 20:38:32 WARN Utils: Your hostname, DanteMutt resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/10/04 20:38:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/10/04 20:38:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-10-04 20:38:35,486 - INFO - Connection succesfully established


In [2]:
df = spark.read \
    .format("mongo") \
    .option("uri", MONGO_URI) \
    .option("collection", "_User") \
    .option("partitioner", "MongoPaginateBySizePartitioner") \
    .load()

# Mostrar los primeros 100 registros
df.show(10)

24/10/04 20:38:46 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/10/04 20:38:52 WARN MongoInferSchema: Array Field 'devices' contains conflicting types converting to StringType
24/10/04 20:38:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+--------------------+--------------------+-------------------+--------------------+----------+------------------+----------------+--------------------+-------------+----------------+----------------+----------------+--------------+-----------+----------------+-----------------+---------------+--------------+--------------------+------------+--------------------+--------------------+--------------------+-------------+----------+------------------------+---------------+--------------------+--------+-------------+----------+------------+-----------+---------+-----------------------+------------------+----------------+--------------------+--------------------+--------------------+------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------+-----------------+--------------------+--------------------+----------------+--------------------+------------------+--------+--------------+-------

                                                                                

In [2]:
def save_json(df,collection):
    json_rdd = df.toJSON()
    collection = collection.lstrip('_')
    output_path = "./exports/" + collection + ".json"

    json_data = json_rdd.collect()

    with open(output_path, 'w') as f:
        f.write('[\n' + ',\n'.join(json_data) + '\n]')

In [8]:
def get_dataframe(collection):
    return spark.read \
            .format("mongo") \
            .option("uri", MONGO_URI) \
            .option("collection", collection) \
            .option("partitioner", "MongoPaginateBySizePartitioner") \
            .load()

In [9]:
def save_parquet(df,collection,repartition=1):
    output_path = "./parquet/" + collection
    df.repartition(repartition).write.mode("overwrite").parquet(output_path)


In [21]:
def move_file(src_folder,dest_folder,new_filename):
    new_filename = new_filename.lstrip('_')
    for file_name in os.listdir(src_folder):
        if file_name.endswith(".parquet"):
            src_file_path = os.path.join(src_folder, file_name)
            dest_file_path = os.path.join(dest_folder, new_filename + ".parquet")
            shutil.move(src_file_path, dest_file_path)
            logging.info(f"Archivo movido y renombrado a: {dest_file_path}")
            break

In [18]:
def pipeline(collection,repartition = 1):
    src_folder = "./parquet/" + collection
    dest_folder = "./exports"
    logging.info(f"Reading Collection: {collection}")
    df = get_dataframe(collection)
    logging.info(f"Collection: {collection} succesfully fetched")
    save_parquet(df,collection,repartition)
    logging.info(f"Collection: {collection} succesfully saved")
    move_file(src_folder,dest_folder,collection)
    logging.info(f"Collection: {collection} succesfully moved to {dest_folder}")

In [15]:
collections = ["State","Country","Municipality","ZipCode"]
for collection in collections:
    pipeline(collection)
# Location, Pokcet Balance, User

2024-09-30 10:18:38,381 - INFO - Reading Collection: State
2024-09-30 10:18:42,859 - INFO - Collection: State succesfully fetched          
2024-09-30 10:18:44,114 - INFO - Collection: State succesfully saved
2024-09-30 10:18:44,117 - INFO - Collection: State succesfully moved to ./exports
2024-09-30 10:18:44,119 - INFO - Reading Collection: Country


Archivo movido y renombrado a: ./exports/State.parquet


2024-09-30 10:18:46,029 - INFO - Collection: Country succesfully fetched
2024-09-30 10:18:47,062 - INFO - Collection: Country succesfully saved
2024-09-30 10:18:47,064 - INFO - Collection: Country succesfully moved to ./exports
2024-09-30 10:18:47,066 - INFO - Reading Collection: Municipality


Archivo movido y renombrado a: ./exports/Country.parquet


2024-09-30 10:19:02,846 - INFO - Collection: Municipality succesfully fetched   
2024-09-30 10:19:11,119 - INFO - Collection: Municipality succesfully saved     
2024-09-30 10:19:11,121 - INFO - Collection: Municipality succesfully moved to ./exports
2024-09-30 10:19:11,122 - INFO - Reading Collection: ZipCode


Archivo movido y renombrado a: ./exports/Municipality.parquet


2024-09-30 10:19:29,847 - INFO - Collection: ZipCode succesfully fetched        
2024-09-30 10:19:49,232 - INFO - Collection: ZipCode succesfully saved          
2024-09-30 10:19:49,235 - INFO - Collection: ZipCode succesfully moved to ./exports


Archivo movido y renombrado a: ./exports/ZipCode.parquet


In [19]:
pipeline("PocketBalance")


Archivo movido y renombrado a: ./exports/PocketBalance.parquet


In [20]:
pipeline("_User")

2024-09-30 10:24:58,775 - INFO - Reading Collection: _User
2024-09-30 10:25:19,784 - INFO - Collection: _User succesfully fetched          
24/09/30 10:25:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2024-09-30 10:31:29,288 - INFO - Collection: _User succesfully saved            
2024-09-30 10:31:29,291 - INFO - Collection: _User succesfully moved to ./exports


Archivo movido y renombrado a: ./exports/_User.parquet


### Location

In [26]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Define el esquema manualmente, especificando las columnas void como StringType
schema = StructType([
    StructField("_created_at", TimestampType(), True),
    StructField("_id", StringType(), True),
    StructField("_p_country", StringType(), True),
    StructField("_p_municipality", StringType(), True),
    StructField("_p_state", StringType(), True),
    StructField("_p_user", StringType(), True),
    StructField("_p_zipCode", StringType(), True),
    StructField("_updated_at", TimestampType(), True),
    StructField("country", StringType(), True),  # Aquí estaba void
    StructField("countryName", StringType(), True),
    StructField("externalNumber", StringType(), True),
    StructField("internalNumber", StringType(), True),
    StructField("municipality", StringType(), True),  # Aquí estaba void
    StructField("municipalityName", StringType(), True),
    StructField("neighborhood", StringType(), True),
    StructField("state", StringType(), True),  # Aquí estaba void
    StructField("stateName", StringType(), True),
    StructField("street", StringType(), True),
    StructField("user", StringType(), True),  # Aquí estaba void
    StructField("zipCode", StringType(), True),  # Aquí estaba void
    StructField("zipCodeName", StringType(), True)
])


In [27]:
df = spark.read \
            .format("mongo") \
            .option("uri", MONGO_URI) \
            .option("collection", collection) \
            .option("partitioner", "MongoPaginateBySizePartitioner") \
            .schema(schema) \
            .load()

In [28]:
df.printSchema()

root
 |-- _created_at: timestamp (nullable = true)
 |-- _id: string (nullable = true)
 |-- _p_country: string (nullable = true)
 |-- _p_municipality: string (nullable = true)
 |-- _p_state: string (nullable = true)
 |-- _p_user: string (nullable = true)
 |-- _p_zipCode: string (nullable = true)
 |-- _updated_at: timestamp (nullable = true)
 |-- country: string (nullable = true)
 |-- countryName: string (nullable = true)
 |-- externalNumber: string (nullable = true)
 |-- internalNumber: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- municipalityName: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- state: string (nullable = true)
 |-- stateName: string (nullable = true)
 |-- street: string (nullable = true)
 |-- user: string (nullable = true)
 |-- zipCode: string (nullable = true)
 |-- zipCodeName: string (nullable = true)



In [29]:
collection = "Location"
src_folder = "./parquet/" + collection
dest_folder = "./exports"
logging.info(f"Collection: {collection} succesfully fetched")
save_parquet(df,collection)
logging.info(f"Collection: {collection} succesfully saved")
move_file(src_folder,dest_folder,collection)
logging.info(f"Collection: {collection} succesfully moved to {dest_folder}")

2024-09-30 10:55:04,709 - INFO - Collection: Location succesfully fetched
2024-09-30 10:55:50,714 - INFO - Collection: Location succesfully saved         
2024-09-30 10:55:50,717 - INFO - Archivo movido y renombrado a: ./exports/Location.parquet
2024-09-30 10:55:50,718 - INFO - Collection: Location succesfully moved to ./exports


### Transaction

In [30]:
from pyspark.sql.types import (
    StructType, StructField, StringType, BooleanType, DoubleType,
    IntegerType, TimestampType, ArrayType, MapType
)

# Definimos el esquema
schema = StructType([
    StructField("_acl", MapType(StringType(), StructType([
        StructField("r", BooleanType(), True),
        StructField("w", BooleanType(), True)
    ])), True),
    
    StructField("_created_at", TimestampType(), True),
    StructField("_id", StringType(), True),
    StructField("_p_card", StringType(), True),
    StructField("_p_deposit", StringType(), True),
    StructField("_p_payer", StringType(), True),
    StructField("_p_payerImageLink", StringType(), True),
    StructField("_p_receiver", StringType(), True),
    StructField("_p_receiverGroup", StringType(), True),
    StructField("_p_receiverImageLink", StringType(), True),
    StructField("_p_swapCard", StringType(), True),
    StructField("_p_transactionCanceled", StringType(), True),
    
    StructField("_rperm", ArrayType(StringType(), True), True),
    StructField("_updated_at", TimestampType(), True),
    StructField("_wperm", ArrayType(StringType(), True), True),
    
    StructField("acceptAmex", BooleanType(), True),
    StructField("accumulatedAmountCharged", DoubleType(), True),
    StructField("accumulatedAmountCompensated", DoubleType(), True),
    StructField("accumulatedAmountReturned", IntegerType(), True),
    
    StructField("address", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("authentication", StringType(), True),
    StructField("authorizationNumber", StringType(), True),
    
    StructField("bank", StringType(), True),
    StructField("bashEncrypt", BooleanType(), True),
    StructField("bin", StringType(), True),
    StructField("cardBrand", StringType(), True),
    StructField("cardId", StringType(), True),
    StructField("cardType", StringType(), True),
    
    StructField("cashback", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("chargeback_category", StringType(), True),
    StructField("chargeback_date", TimestampType(), True),
    
    StructField("conciliationType", StringType(), True),
    StructField("country", StringType(), True),
    StructField("currency", StringType(), True),
    StructField("depositDate", TimestampType(), True),
    StructField("description", StringType(), True),
    
    StructField("deviceToken", StringType(), True),
    StructField("employee", StringType(), True),  # Puedes ajustar si es void o tiene algún tipo específico
    StructField("gateway", StringType(), True),
    StructField("gatewayFee", DoubleType(), True),
    
    StructField("installments", IntegerType(), True),
    StructField("isInCondor", BooleanType(), True),
    StructField("isSwapGroupTransaction", BooleanType(), True),
    StructField("last4", StringType(), True),
    
    StructField("location", ArrayType(DoubleType(), True), True),
    StructField("message", StringType(), True),
    StructField("mit", StringType(), True),
    StructField("movements", ArrayType(StringType(), True), True),
    
    StructField("newBalance", DoubleType(), True),
    StructField("oldBalance", DoubleType(), True),
    StructField("originalMovement", StringType(), True),
    
    StructField("payLater", BooleanType(), True),
    StructField("payedAt", TimestampType(), True),
    StructField("payerFee", DoubleType(), True),
    StructField("payerId", StringType(), True),
    StructField("payerName", StringType(), True),
    StructField("payerNewBalance", DoubleType(), True),
    StructField("payerOldBalance", DoubleType(), True),
    
    StructField("provider", StringType(), True),
    StructField("reaction", StringType(), True),
    StructField("receiverFee", DoubleType(), True),
    StructField("receiverId", StringType(), True),
    StructField("receiverName", StringType(), True),
    StructField("receiverNewBalance", DoubleType(), True),
    StructField("receiverOldBalance", DoubleType(), True),
    
    StructField("requestId", StringType(), True),
    StructField("requestIp", StringType(), True),
    StructField("retrievalReference", StringType(), True),
    StructField("slackId", StringType(), True),
    StructField("source", StringType(), True),
    
    StructField("state", StringType(), True),
    StructField("status", StringType(), True),
    StructField("tenant", StringType(), True),
    StructField("terminalIdentification", StringType(), True),
    StructField("tip", IntegerType(), True),
    
    StructField("token", StringType(), True),
    StructField("transactionType", StringType(), True),
    StructField("transactionalSession", StringType(), True),
    StructField("url", StringType(), True)
])

In [31]:
collection = "Transaction"

df = spark.read.schema(schema).format("mongo").option("uri", MONGO_URI).option("collection", collection).option("partitioner", "MongoPaginateBySizePartitioner").load()

In [6]:
df.show()

24/09/27 14:11:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+--------------------+----------+---------------+------------------+----------------+-----------------+----------------+----------------+--------------------+-------------------+----------------------+--------------------+--------------------+------+----------+------------------------+----------------------------+-------------------------+-------+------+--------------+-------------------+-------------+-----------+--------+-------------+------+-------------+--------+------------+-------------------+---------------+----------------+-------------+--------+-----------+--------------------+--------------------+--------+-------+----------+------------+----------+----------------------+-----+--------------------+--------------------+----+-----------+----------+----------+----------------+--------+--------------------+--------+----------+--------------------+---------------+---------------+--------+--------+-----------+--------------------+--------------------+------------

In [32]:
print(len(df.columns))


81


In [5]:
df.printSchema()

root
 |-- _acl: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- r: boolean (nullable = true)
 |    |    |-- w: boolean (nullable = true)
 |-- _created_at: timestamp (nullable = true)
 |-- _id: string (nullable = true)
 |-- _p_card: string (nullable = true)
 |-- _p_deposit: string (nullable = true)
 |-- _p_payer: string (nullable = true)
 |-- _p_payerImageLink: string (nullable = true)
 |-- _p_receiver: string (nullable = true)
 |-- _p_receiverGroup: string (nullable = true)
 |-- _p_receiverImageLink: string (nullable = true)
 |-- _p_swapCard: string (nullable = true)
 |-- _p_transactionCanceled: string (nullable = true)
 |-- _rperm: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- _updated_at: timestamp (nullable = true)
 |-- _wperm: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- acceptAmex: boolean (nullable = true)
 |-- accumulatedAmountCharged: double (nullable = t

In [33]:
output_path = "./parquet/" + collection
df.repartition(2).write.mode("overwrite").parquet(output_path)


                                                                                

### Upsert


In [4]:
# Obtener la zona horaria de México usando zoneinfo
tz_mexico = ZoneInfo("America/Mexico_City")

# Obtener la fecha de hoy a las 00:00 en la zona horaria de México
fecha_hoy = datetime.now(tz_mexico).replace(hour=0, minute=0, second=0, microsecond=0)

# Convertir la fecha al formato ISO 8601 (para MongoDB)
fecha_hoy_iso = fecha_hoy.isoformat()

print(fecha_hoy)

2024-09-26 00:00:00-06:00


In [8]:
collection = "PocketBalance"

pipeline = f"""[{{ 
    "$match": {{
        "$or": [
            {{"_updated_at": {{"$gte": {{"$date": "{fecha_hoy_iso}"}}}}}},
            {{"_created_at": {{"$gte": {{"$date": "{fecha_hoy_iso}"}}}}}}
        ]
    }}
}}]"""

# Lectura de la colección desde MongoDB con filtro
df_filtrado = spark.read \
            .format("mongo") \
            .option("uri", MONGO_URI) \
            .option("collection", collection) \
            .option("partitioner", "MongoPaginateBySizePartitioner") \
            .option("pipeline", pipeline) \
            .load()

save_json(df_filtrado,collection)

                                                                                

In [9]:
output_path = "./exports/" + collection + ".parquet"
df_filtrado.write.mode("overwrite").parquet(output_path)


                                                                                

### Read Parquet


In [15]:
df_parquet = spark.read.parquet("./exports/PocketBalance.parquet")
df_parquet.show()

+--------------------+--------------------+----------+----------------+------------+--------------------+--------------------+--------------------+--------------------+-------+------------------+----------+------------------------------+------------------------+------+--------------------------+-----------+----------+
|                _acl|         _created_at|       _id|        _p_owner|      _rperm|         _updated_at|   accountActivityAt|   accountExternalId|   accountInternalId|balance|             clabe|isInCondor|isInCondorTransactionalProfile|last_stp_request_message|status|stp_communication_attempts|swapGroupId|    userId|
+--------------------+--------------------+----------+----------------+------------+--------------------+--------------------+--------------------+--------------------+-------+------------------+----------+------------------------------+------------------------+------+--------------------------+-----------+----------+
|{NpcqRJRRlt -> {t...|2024-09-26 15:22:.

In [12]:
file_path = "./exports/State.json"
json_df = spark.read.json(file_path,multiLine=True)
json_df.show()

+--------------------+----------+------------------+--------------------+----+---------+----------+--------+--------------------+--------+
|         _created_at|       _id|        _p_country|         _updated_at|code|countryId|externalId|iataCode|                name| stateId|
+--------------------+----------+------------------+--------------------+----+---------+----------+--------+--------------------+--------+
|2020-01-02T15:12:...|LzJCjUPF7h|Country$elWGUJgxg2|2021-12-07T13:34:...|  TC|      236|        27|     TAB|             TABASCO|      27|
|2020-01-02T15:12:...|MYpxq5HgGJ|Country$elWGUJgxg2|2021-12-07T13:32:...|  BS|      236|         3|     BCS| BAJA CALIFORNIA SUR|       3|
|2020-01-02T15:12:...|dQoyDGabCk|Country$elWGUJgxg2|2021-12-07T13:33:...|  PL|      236|        21|     PUE|              PUEBLA|      21|
|2020-01-02T15:12:...|tWnqTFckHb|Country$elWGUJgxg2|2021-12-07T13:32:...|  BC|      236|         2|      BC|     BAJA CALIFORNIA|       2|
|2020-01-02T15:12:...|z0GVQ

### Databricks

In [37]:
import requests

databricks_instance = 'https://clip-banking-dev.cloud.databricks.com'
headers = {'Authorization': f'Bearer {DBKS_TOKEN}'}

def upload_to_dbfs(local_file_path, dbfs_path):
    with open(local_file_path, 'rb') as f:
        file_content = f.read()
        url = f'{databricks_instance}/api/2.0/fs/files{dbfs_path}' 
        response = requests.put(url, headers=headers, data=file_content)

    if response.status_code != 204:
        logging.error(f"Error al subir el archivo: {response.status_code}, {response.text}")
    else:
        logging.info(f'Archivo {local_file_path} subido exitosamente a {dbfs_path}')

In [38]:
collections = ["State","Country","Municipality","ZipCode","PocketBalance","Location","User"]
for collection in collections:
    parquet_file = f'{collection}.parquet'
    local_path = f'./exports/{parquet_file}'
    dbfs_path = f'/Volumes/sandbox_banking/mutt_data_banking/mongo/{parquet_file}'  
    upload_to_dbfs(local_path, dbfs_path)

2024-09-30 11:23:04,636 - INFO - Archivo ./exports/State.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/State.parquet
2024-09-30 11:23:06,170 - INFO - Archivo ./exports/Country.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/Country.parquet
2024-09-30 11:23:08,123 - INFO - Archivo ./exports/Municipality.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/Municipality.parquet
2024-09-30 11:23:19,161 - INFO - Archivo ./exports/ZipCode.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/ZipCode.parquet
2024-09-30 11:23:45,475 - INFO - Archivo ./exports/PocketBalance.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/PocketBalance.parquet
2024-09-30 11:24:00,759 - INFO - Archivo ./exports/Location.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/Location.parquet
2024-09-30 11:25:35,885 - INFO - Archivo ./exports/User.

In [39]:
collections = ["TransactionPart1","TransactionPart2"]
for collection in collections:
    parquet_file = f'{collection}.parquet'
    local_path = f'./exports/{parquet_file}'
    dbfs_path = f'/Volumes/sandbox_banking/mutt_data_banking/mongo/{parquet_file}'  
    upload_to_dbfs(local_path, dbfs_path)

2024-09-30 11:28:28,602 - INFO - Archivo ./exports/TransactionPart1.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/TransactionPart1.parquet
2024-09-30 11:31:04,546 - INFO - Archivo ./exports/TransactionPart2.parquet subido exitosamente a /Volumes/sandbox_banking/mutt_data_banking/mongo/TransactionPart2.parquet
