#### Cargaremos la información de mysql a los distintas fuentes que utilizaremos

In [4]:
import sqlalchemy as db
from sqlalchemy import text
import pandas as pd

In [5]:
engine = db.create_engine("mysql://root:root@192.168.149.165:3310/retail_db")
conn = engine.connect()

In [6]:
customers_df = pd.read_sql_query(text('SELECT * FROM customers'), con=conn)

In [7]:
orders_df = pd.read_sql_query(text('SELECT * FROM orders'), con=conn)

In [8]:
order_items_df = pd.read_sql_query(text('SELECT * FROM order_items'), con=conn)

In [9]:
products_df = pd.read_sql_query(text('SELECT * FROM products'), con=conn)

In [10]:
categories_df = pd.read_sql_query(text('SELECT * FROM categories'), con=conn)

In [11]:
departments_df = pd.read_sql_query(text('SELECT * FROM departments'), con=conn)

#### Cargamos los datos a Azure Data Lake

In [12]:
from azure.storage.blob import ContainerClient
import io

In [13]:
conn_str = "BlobEndpoint=https://adlsdatapath.blob.core.windows.net/;QueueEndpoint=https://adlsdatapath.queue.core.windows.net/;FileEndpoint=https://adlsdatapath.file.core.windows.net/;TableEndpoint=https://adlsdatapath.table.core.windows.net/;SharedAccessSignature=sv=2021-12-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2023-06-01T10:33:16Z&st=2023-03-17T02:33:16Z&spr=https&sig=14f9a42cRquiSn5bpy4dbLbWVeH2tRonE2AsedPIluw%3D"
container = "source"

container_client = ContainerClient.from_connection_string(
    conn_str=conn_str, 
    container_name=container
)


In [14]:
output = io.StringIO()
output = customers_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/customers", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fc2c931f880>

In [15]:
output = io.StringIO()
output = orders_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/orders", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fc2c92aca00>

In [16]:
output = io.StringIO()
output = order_items_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/order_items", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fc2c92ace80>

In [17]:
output = io.StringIO()
output = products_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/products", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fc2c936af40>

In [18]:
output = io.StringIO()
output = categories_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/categories", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fc2c925bbe0>

In [19]:
output = io.StringIO()
output = departments_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/departments", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fc2c925f700>

#### Cargamos los datos a Cloud Storage

In [20]:
import os
from google.cloud.storage import Client

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/user/app/fresh-bloom-372404-c792e5b41e7f.json"


In [21]:
client = Client()
bucket = client.get_bucket('source-projects')


In [23]:
bucket.blob('retail/customers').upload_from_string(customers_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [24]:
bucket.blob('retail/orders').upload_from_string(orders_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [25]:
bucket.blob('retail/order_items').upload_from_string(order_items_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [26]:
bucket.blob('retail/products').upload_from_string(products_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [27]:
bucket.blob('retail/categories').upload_from_string(categories_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [28]:
bucket.blob('retail/departments').upload_from_string(departments_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

#### Cargamos los datos a MongoDB

In [None]:
from pymongo import MongoClient
def get_database():
    # Provide the mongodb atlas url to connect python to mongodb using pymongo
    CONNECTION_STRING = "mongodb+srv://m001-student:mCxRoc3yh6nf2Xyv@sandbox.okkbl.mongodb.net/?retryWrites=true&w=majority"

    #Create a connection using MongoClient. You can import MongoClient or use pymongo.MongoClient
    client = MongoClient(CONNECTION_STRING)

    return client['retail_db']

In [None]:
df = customers_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["customers"].insert_many(df_to_dict)

In [24]:
df = orders_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["orders"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7f641cc99b50>

In [25]:
df = order_items_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["order_items"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7f641e416e50>

In [26]:
df = products_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["products"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7f64188f6550>

In [27]:
df = categories_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["categories"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7f641b33fd60>

In [28]:
df = departments_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["departments"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7f6419305340>