In [None]:
import os

print(os.environ['PBI_SERVER'])
print(os.environ['PBI_DB'])

In [None]:
import ssas_api as powerbi

powerbi.load_libraries()

In [None]:
powerbi_server = os.environ['PBI_SERVER']
powerbi_db_name = os.environ['PBI_DB']

conn = "Provider=MSOLAP;Data Source=" + powerbi_server + ";Initial Catalog='';"
print(conn)


In [None]:
import System
import Microsoft.AnalysisServices.Tabular as tom

tom_server = tom.Server()
tom_server.Connect(conn)

for item in tom_server.Databases:
    print("Database: ", item.Name)
    print("Compatibility Level: ", item.CompatibilityLevel) 
    print("Created: ", item.CreatedTimestamp)
    print()

tom_database = tom_server.Databases[powerbi_db_name]


In [None]:
import pandas as pd

def map_column(column):
    return {
        'Tabla': column.Table.Name,
        'Nombre': column.Name,
        'Llave': column.IsKey,
        'Oculta': column.IsHidden
    }

columns =  [map_column(column) for table in tom_database.Model.Tables for column in table.Columns]  
df_columns = pd.DataFrame(columns)
display(df_columns)


In [None]:
import pandas as pd
from System import Enum

def map_measure(measure):
    return {
        'Tabla': measure.Table.Name,
        'Carpeta': measure.DisplayFolder,
        'Nombre': measure.Name,
        'Oculta': measure.IsHidden,
        'Implicita':measure.IsSimpleMeasure,
        'FechaModificacion':measure.ModifiedTime,
        'TipoDato': Enum.GetName(tom.DataType,measure.DataType),
        'Formato': measure.FormatString,
        'Expresion': measure.Expression
    }

measures = [map_measure(measure) for table in tom_database.Model.Tables for measure in table.Measures]
df_measures = pd.DataFrame(measures)
display(df_measures)


In [None]:
display(df_measures[df_measures['Nombre'].str.startswith('Net')])
display(df_measures[df_measures['Nombre'].str.contains('Sales')])
display(df_measures[df_measures['Oculta']])

In [None]:
with pd.ExcelWriter('./powerbi_tom.xlsx') as writer:  
    df_columns.to_excel(writer, sheet_name='Columnas', index=False)
    df_measures.to_excel(writer, sheet_name='Medidas', index=False)

In [None]:
tom_measures_net = [measure for table in tom_database.Model.Tables for measure in table.Measures if measure.Name.startswith('Net')]
display(tom_measures_net)

In [None]:
display([(measure.Name,measure.DisplayFolder) for measure in tom_measures_net])

In [None]:
tom_measures = [measure for table in tom_database.Model.Tables for measure in table.Measures]
measure_tablular_editor = [(measure.Name, annotation.Name,annotation.Value) for measure in tom_measures for annotation in measure.Annotations if annotation.Name == 'Creada con' and annotation.Value == 'Tabular Editor']
display(measure_tablular_editor)

In [None]:
for measure in tom_measures_net:
    measure.DisplayFolder = 'Net'

display([(measure.Name,measure.DisplayFolder)  for measure in tom_measures_net])

In [None]:
tom_database.Model.SaveChanges()

In [None]:

tom_server.Refresh(True)

tom_measures = [measure for table in tom_database.Model.Tables for measure in table.Measures if measure.Name in ['Returns']]
display(len(tom_measures))

tom_table = tom_database.Model.Tables.Find('Analysis DAX')
display(tom_table)

for measure in tom_measures:
    new_measure = tom.Measure()

    new_measure.Name = f'{measure.Name} YTD'
    new_measure.Expression = f"TOTALYTD ( [{measure.Name}], 'Calendar'[Date] )"
    new_measure.DisplayFolder = measure.DisplayFolder
    new_measure.Description = f'Calcula {measure.Name} desde el inicio del año'

    measure.Table.Measures.Add(new_measure)

    display(f'La medida "{new_measure.Name}" se añadió a la tabla "{new_measure.Table.Name}"" y a la carpeta "{new_measure.DisplayFolder}""')

tom_database.Model.SaveChanges() 


In [None]:
display(df_measures)

In [None]:
    
df_measures['ref'] = df_measures['Nombre'].apply(lambda x: list(df_measures[df_measures['Expresion'].str.contains('[' + x +']', regex=False)]['Nombre']))
display(df_measures)



In [None]:
import networkx as nx

G = nx.DiGraph()
G.add_nodes_from(list(df_measures['Nombre']))

for _,measure in df_measures.iterrows():
    edges = [(measure_ref,measure['Nombre']) for measure_ref in measure['ref']]
    if len(edges) > 0:
        G.add_edges_from(edges)

nx.draw(G, with_labels=True)

In [None]:
measure_to_search = 'Returns Variance %'
measure_to_search = 'Net Sales PM'
# measure_to_search = 'Net Sales'

MG = nx.DiGraph()
MG.add_edges_from(G.in_edges(measure_to_search))
MG.add_edges_from(G.out_edges(measure_to_search))
nx.draw(MG, with_labels=True)

In [None]:
graph = []
for _,measure in df_measures.iterrows():
    edges = [(measure_ref, measure['Nombre']) for measure_ref in measure['ref']]
    graph.extend(edges)

df_graph = pd.DataFrame(graph,columns=['Nodo Fuente','Nodo Destino'])
df_graph.to_csv('./Grafo de Medidas.csv', index=False)
