In [2]:
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from IPython.display import display, HTML, JSON, Markdown

from dotenv import load_dotenv
import pandas as pd
import os

load_dotenv()

AAD_TENANT_ID = os.getenv("AAD_TENANT_ID")
KUSTO_CLUSTER = os.getenv("KUSTO_CLUSTER")
KUSTO_INGEST_URI = os.getenv("KUSTO_INGEST_URI")
KUSTO_DATABASE = os.getenv("KUSTO_DATABASE")
KUSTO_MANAGED_IDENTITY_APP_ID = os.getenv("KUSTO_MANAGED_IDENTITY_APP_ID")
KUSTO_MANAGED_IDENTITY_SECRET = os.getenv("KUSTO_MANAGED_IDENTITY_SECRET")

In [3]:
# Connect to adx using AAD app registration
cluster = KUSTO_CLUSTER
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(cluster, KUSTO_MANAGED_IDENTITY_APP_ID, KUSTO_MANAGED_IDENTITY_SECRET,  AAD_TENANT_ID)
client = KustoClient(kcsb)
kusto_db = KUSTO_DATABASE
table_name = "impressions"

In [4]:
def jaccard_set(list1, list2):
    intersection = len(list(set(list1).intersection(list2)))
    union = (len(list1) + len(list2)) - intersection
    return float(intersection) / union

In [5]:
query = """
external_table("Orders")
| summarize ordersWhereFound = make_list(SalesOrderID) by ProductID
"""
response = client.execute(kusto_db, query)
df = dataframe_from_result_table(response.primary_results[0])

result_df = pd.DataFrame(columns=['ProductID1', 'ProductID2', 'jaccard_similarity'])
for idx in df.index:
    for idx2 in df.index+1:
        if idx2 >= len(df.index):
            break
        result = {}
        if df['ProductID'][idx] == df['ProductID'][idx2]:
            continue
        jaccard = jaccard_set(df['ordersWhereFound'][idx], df['ordersWhereFound'][idx2])
        result['ProductID1'] = df['ProductID'][idx]
        result['ProductID2'] = df['ProductID'][idx2]
        result['jaccard_similarity'] = jaccard
        result_df.loc[len(result_df)] = result

result_df.to_csv('./data/jaccard_similarity.csv', index=False)

In [None]:
import pandas as pd
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.data.data_format import DataFormat
from azure.kusto.ingest import (
    BlobDescriptor,
    FileDescriptor,
    IngestionProperties,
    IngestionStatus,
    KustoStreamingIngestClient,
    ManagedStreamingIngestClient,
    QueuedIngestClient,
    StreamDescriptor,
)

# Connect to adx using AAD app registration for ingestion - use ingest URI
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, KUSTO_MANAGED_IDENTITY_APP_ID, KUSTO_MANAGED_IDENTITY_SECRET,  AAD_TENANT_ID)
client = KustoClient(kcsb)
kusto_db = KUSTO_DATABASE

DESTINATION_TABLE = "productRecommendations"
DESTINATION_TABLE_COLUMN_MAPPING = "productRecommendations_mapping"

client = QueuedIngestClient(kcsb)

# there are a lot of useful properties, make sure to go over docs and check them out
ingestion_props = IngestionProperties(
    database=KUSTO_DATABASE,
    table=DESTINATION_TABLE,
    data_format=DataFormat.CSV,
    # in case status update for success are also required (remember to import ReportLevel from azure.kusto.ingest)
    # report_level=ReportLevel.FailuresAndSuccesses,
    # in case a mapping is required (remember to import IngestionMappingKind from azure.kusto.data.data_format)
    # ingestion_mapping_reference="{json_mapping_that_already_exists_on_table}",
    # ingestion_mapping_kind= IngestionMappingKind.JSON,
)

df = pd.read_csv("./data/jaccard_similarity.csv")

client.ingest_from_dataframe(df, ingestion_properties=ingestion_props)