# Ingesting data with OneLake Part I


In this notebook we will learn how to index data from Microsoft OneLake [OneLake](https://learn.microsoft.com/en-us/fabric/onelake/onelake-overview) to Elasticsearch. This demonstration is related to the article [Ingesting data with OneLake Part I.](https://www.elastic.co/search-labs/blog/ingesting-data-with-onelake-part-i)


In [None]:
!python3 -m pip install elasticsearch==8.14 azure-identity azure-storage-file-datalake azure-cli python-docx

import os
import csv
import chardet

from docx import Document
from io import StringIO
from getpass import getpass
from google.colab import files
from elasticsearch import Elasticsearch, exceptions, helpers
from elasticsearch.helpers import bulk
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeDirectoryClient, DataLakeServiceClient

Collecting elasticsearch==8.14
  Downloading elasticsearch-8.14.0-py3-none-any.whl.metadata (7.2 kB)
Collecting azure-identity
  Downloading azure_identity-1.19.0-py3-none-any.whl.metadata (80 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m80.6/80.6 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting azure-storage-file-datalake
  Downloading azure_storage_file_datalake-12.17.0-py3-none-any.whl.metadata (16 kB)
Collecting azure-cli
  Downloading azure_cli-2.65.0-py3-none-any.whl.metadata (8.3 kB)
Collecting python-docx
  Downloading python_docx-1.1.2-py3-none-any.whl.metadata (2.0 kB)
Collecting elastic-transport<9,>=8.13 (from elasticsearch==8.14)
  Downloading elastic_transport-8.15.1-py3-none-any.whl.metadata (3.7 kB)
Collecting azure-core>=1.31.0 (from azure-identity)
  Downloading azure_core-1.32.0-py3-none-any.whl.metadata (39 kB)
Collecting msal>=1.30.0 (from azure-identity)
  Downloading msal-1.31.0-py3-none-any.whl.metadata (11 kB)
Collecting msa

Let's sign in with your Microsoft account. At the end of the process, the Google Colab console will display your available subscriptions. Select your subscription by entering the subscription number shown in the console.

In [None]:
!az login --allow-no-subscriptions # For no subscriptions login

[91mError loading command module 'mysql': cannot import name 'mysql_flexibleservers' from 'azure.mgmt.rdbms' (/usr/local/lib/python3.10/dist-packages/azure/mgmt/rdbms/__init__.py)[0m
[91mError loading command module 'rdbms': No module named 'azure.mgmt.rdbms.mysql_flexibleservers'[0m
[93mTo sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code AYGZ24EJG to authenticate.[0m

Retrieving tenants and subscriptions for the selection...
[93mThe following tenants don't contain accessible subscriptions. Use `az login --allow-no-subscriptions` to have tenant level access.[0m
[93m56783f2b-514d-4ff9-b3ab-3fdc5e7ace5d 'Kibernum'[0m

[Tenant and subscription selection]

No     Subscription name          Subscription ID                       Tenant
-----  -------------------------  ------------------------------------  ------------------------------------
[96m[1][0m *  [96mN/A(tenant level account)[0m  [96m56783f2b-514d-4ff9-b3ab-3fdc5e7ace5d

## Variable Declaration

Necessary variables for the operation of the services we will use. Please, insert your Elasticsearch credentials below:

In [None]:
ELASTIC_CLUSTER_ID = getpass("Elastic Cloud ID: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")

ONELAKE_ACCOUNT_NAME = "onelake"
ONELAKE_WORKSPACE_NAME = "ShoesticWorkspace"
# Path in format <DataLake>.Lakehouse/files/<Folder path>
ONELAKE_DATA_PATH = "shoesticDatalake.Lakehouse/Files/ProductsData"

Elastic Cloud ID: ··········
Elastic Api Key: ··········


## Initialization of OneLake and Elasticsearch Services

In [None]:
# Microsoft token
token_credential = DefaultAzureCredential()

# OneLake services
service_client = DataLakeServiceClient(
    account_url=f"https://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com",
    credential=token_credential,
)
file_system_client = service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME)
directory_client = file_system_client.get_directory_client(ONELAKE_DATA_PATH)

# Elasticsearch client
es_client = Elasticsearch(
    cloud_id=ELASTIC_CLUSTER_ID,
    api_key=ELASTIC_API_KEY,
)

## Functions declaration

In [None]:
# Upload a file to a LakeHouse directory
def upload_file_to_directory(directory_client, local_path, file_name):
    file_client = directory_client.get_file_client(file_name)

    with open(local_path, mode="rb") as data:
        file_client.upload_data(data, overwrite=True)

    print(f"File: {file_name} uploaded to the data lake.")

In [None]:
# Get directory contents from your lake folder
def list_directory_contents(file_system_client, directory_name):
    paths = file_system_client.get_paths(path=directory_name)

    for path in paths:
        print(path.name + "\n")

In [None]:
# Get a file by name from your lake folder
def get_file_by_name(file_name, directory_client):
    return directory_client.get_file_client(file_name)

In [None]:
# Google Colab saves the files in a internal folder, to prevent the automatic rename of the files, we must to clean de directory
def clean_directory(directory):
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        if os.path.isfile(file_path):
            os.remove(file_path)
            print(f"File '{filename}' removed successfully.")

In [None]:
# Decode csv
def get_csv_content(file_client):
    download = file_client.download_file()
    file_content = download.readall()

    result = chardet.detect(file_content)
    encoding = result["encoding"]

    return file_content.decode(encoding)

In [None]:
# Decode docx
def get_docx_content(file_client):
    download = file_client.download_file()
    file_content = download.readall()

    temp_file_path = "temp.docx"
    with open(temp_file_path, "wb") as temp_file:
        temp_file.write(file_content)

    doc = Document(temp_file_path)
    text = []
    for paragraph in doc.paragraphs:
        text.append(paragraph.text)

    return "\n".join(text)

## Uploading data to OneLake

Before start to upload your files is higlhy recommended to clean the Google Colab directory.

In [None]:
clean_directory("/content")

Upload the csv file:

In [None]:
# Upload csv to Colab

# Upload one local file
uploaded = files.upload()
csv_file_name = list(uploaded.keys())[0]

# Google Colab file path
csv_local_path = f"/content/{csv_file_name}"

Saving products.csv to products.csv


Upload the 3 docx files:

In [None]:
# Upload docx files to Colab

uploaded = files.upload()
docx_files = list(uploaded.keys())

docx_local_paths = [f"/content/{file_name}" for file_name in docx_files]

Saving sport-sneakers.docx to sport-sneakers.docx
Saving classic-loafers.docx to classic-loafers.docx
Saving beach-flip-flops.docx to beach-flip-flops.docx


In [None]:
# Upload files to Lakehouse
upload_file_to_directory(directory_client, csv_local_path, csv_file_name)

for docx_local_path in docx_local_paths:
    docx_file_name = docx_local_path.split("/")[-1]
    upload_file_to_directory(directory_client, docx_local_path, docx_file_name)

File: products.csv uploaded to the data lake.
File: sport-sneakers.docx uploaded to the data lake.
File: classic-loafers.docx uploaded to the data lake.
File: beach-flip-flops.docx uploaded to the data lake.


Check if files are now available in your datalake.

In [None]:
list_directory_contents(file_system_client, ONELAKE_DATA_PATH)

shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx

shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx

shoesticDatalake.Lakehouse/Files/ProductsData/products.csv

shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx



## Indexing data to Elasticsearch

## Creating embeddings endpoint

In [None]:
try:
    es_client.options(
        request_timeout=60, max_retries=3, retry_on_timeout=True
    ).inference.put_model(
        inference_id="onelake-inference-endpoint",
        body={
            "task_type": "sparse_embedding",
            "service": "elser",
            "service_settings": {
                "model_id": ".elser_model_2_linux-x86_64",
                "num_allocations": 1,
                "num_threads": 1,
            },
        },
    )

    print("Embedding endpoint created successfully.")
except exceptions.BadRequestError as e:
    raise e

Embedding endpoint created successfully.


## Creating Mappings

In [None]:
# For data in csv
try:
    es_client.indices.create(
        index="shoestic-products",
        body={
            "mappings": {
                "properties": {
                    "product_id": {"type": "keyword"},
                    "product_name": {"type": "text"},
                    "amount": {"type": "float"},
                    "tags": {"type": "keyword"},
                }
            }
        },
    )
except exceptions.RequestError as e:
    if e.error == "resource_already_exists_exception":
        print("Index already exists.")
    else:
        raise e

In [None]:
# For data in docx
try:
    es_client.indices.create(
        index="shoestic-products-descriptions",
        body={
            "mappings": {
                "properties": {
                    "title": {"type": "text", "analyzer": "english"},
                    "super_body": {
                        "type": "semantic_text",
                        "inference_id": "onelake-inference-endpoint",
                    },
                    "body": {"type": "text", "copy_to": "super_body"},
                }
            }
        },
    )
except exceptions.RequestError as e:
    if e.error == "resource_already_exists_exception":
        print("Index already exists.")
    else:
        raise e

## Indexing data

### Retrieving documents from OneLake

In [None]:
# Getting files from Lakehouse
csv_file_client = get_file_by_name("products.csv", directory_client)

docx_files_clients = []

for docx_file_name in docx_files:
    docx_files_clients.append(get_file_by_name(docx_file_name, directory_client))

### Extract content

In [None]:
csv_content = get_csv_content(csv_file_client)
reader = csv.DictReader(StringIO(csv_content))

docx_contents = []

for docx_file_client in docx_files_clients:
    docx_contents.append(get_docx_content(docx_file_client))

print("CSV FILE CONTENT: ", csv_content)
print("DOCX FILE CONTENT: ", docx_contents)

CSV FILE CONTENT:  product_id,product_name,amount,tags
P-115,Classic Loafers,19.69,"classic,formal"
P-114,Sport Sneakers,156.18,"sport,casual"
P-109,Ankle Boots,87.13,"boots,winter"
P-118,Casual Sandals,128.22,"casual,summer"
P-112,Running Shoes,132.27,"sport,running"
P-116,Leather Sandals,107.63,"leather,casual"
P-105,Canvas Sneakers,75.56,"casual,sneakers"
P-107,Winter Boots,49.57,"winter,outdoor"
P-111,Formal Oxfords,177.70,"formal,classic"
P-119,Beach Flip-Flops,50.64,"beach,summer"

DOCX FILE CONTENT:  ['Crafted for an active lifestyle, this footwear offers excellent support and durability, featuring a non-slip sole that’s perfect for both workouts and casual use.', 'With a timeless design, this elegant choice pairs well with professional and social attire. Made from high-quality leather, it adds a touch of sophistication.', 'Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing 

In [None]:
# The tags must be an array
rows = csv_content.splitlines()
reader = csv.DictReader(rows)

modified_rows = []

for row in reader:
    row["tags"] = row["tags"].replace('"', "").split(",")
    modified_rows.append(row)

    print(row["tags"])

reader = modified_rows

['classic', 'formal']
['sport', 'casual']
['boots', 'winter']
['casual', 'summer']
['sport', 'running']
['leather', 'casual']
['casual', 'sneakers']
['winter', 'outdoor']
['formal', 'classic']
['beach', 'summer']


### Indexing data

In [None]:
csv_actions = [{"_index": "shoestic-products", "_source": row} for row in reader]

docx_actions = [
    {
        "_index": "shoestic-products-descriptions",
        "_source": {"title": docx_file_name, "body": docx},
    }
    for docx_file_name, docx in zip(docx_files, docx_contents)
]


helpers.bulk(es_client, csv_actions)
print("CSV data indexed successfully.")
helpers.bulk(es_client, docx_actions)
print("DOCX data indexed successfully.")

CSV data indexed successfully.
DOCX data indexed successfully.


### Retrieving data from Elasticsearch

Semantic search: let's search for the word 'summer' and see what results the search returns.

In [None]:
response = es_client.search(
    index="shoestic-products-descriptions",
    body={
        "size": 1,
        "_source": {"excludes": ["*embeddings", "*chunks"]},
        "query": {"semantic": {"field": "super_body", "query": "summer"}},
    },
)

print(response["hits"]["hits"][0]["_source"]["body"])

Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun.


Now let's search for the same word in the product tags using a full-text search. Both responses are related.

In [None]:
response = es_client.search(
    index="shoestic-products", body={"size": 1, "query": {"term": {"tags": "summer"}}}
)

print(response["hits"]["hits"][0]["_source"])

{'product_id': 'P-118', 'product_name': 'Casual Sandals', 'amount': '128.22', 'tags': ['casual', 'summer']}


## Cleanup

Finally, we can delete the resources used to prevent them from consuming resources.

In [None]:
# Cleanup - Delete Index
es_client.indices.delete(index="shoestic-products", ignore=[400, 404])
es_client.indices.delete(index="shoestic-products-descriptions", ignore=[400, 404])

# Cleanup - Delete Embeddings Endpoint
es_client.inference.delete_model(
    inference_id="onelake-inference-endpoint", ignore=[400, 404]
)

  es_client.indices.delete(index="shoestic-products", ignore=[400, 404])
  es_client.indices.delete(index="shoestic-products-descriptions", ignore=[400, 404])
  es_client.inference.delete_model(inference_id="onelake-inference-endpoint", ignore=[400, 404])


ObjectApiResponse({'acknowledged': True, 'pipelines': []})