## ROOT Implementation

In [1]:
import ROOT
import os

RDataFrame = ROOT.RDF.Experimental.Distributed.OSCAR.RDataFrame

oscarclient = {
    "minio_endpoint": os.environ['minio_endpoint'],
    "minio_access":   os.environ['minio_access'],
    "minio_secret":   os.environ['minio_secret'],
    "bucket_name": 'root-oscar',
    # Optional arguments
    "benchmarking" : True,
    "oscar_endpoint": f"{os.environ['oscar_endpoint']}",
    "oscar_access":   os.environ['oscar_access'],
    "oscar_secret":   os.environ['oscar_secret']
}

Welcome to JupyROOT 6.27/01


In [2]:
def dimuon_analysis(df):
    # For simplicity, select only events with exactly two muons and require opposite charge
    df_2mu = df.Filter("nMuon == 2", "Events with exactly two muons")
    df_os = df_2mu.Filter("Muon_charge[0] != Muon_charge[1]", "Muons with opposite charge")
    
    # Compute invariant mass of the dimuon system
    df_mass = df_os.Define("Dimuon_mass", "InvariantMass(Muon_pt, Muon_eta, Muon_phi, Muon_mass)")
    
    # Make histogram of dimuon mass spectrum. Note how we can set titles and axis labels in one go.
    h = df_mass.Histo1D(("Dimuon_mass", "Dimuon mass;m_{#mu#mu} (GeV);N_{Events}", 30000, 0.25, 300), "Dimuon_mass")
    
    # Produce plot
    ROOT.gStyle.SetOptStat(0); ROOT.gStyle.SetTextFont(42)
    c = ROOT.TCanvas("c", "", 800, 700)
    c.SetLogx(); c.SetLogy()
    
    watch = ROOT.TStopwatch()
    h.SetTitle("")
    print(f"Time elapsed {watch.RealTime()}")
    h.GetXaxis().SetTitleSize(0.04)
    h.GetYaxis().SetTitleSize(0.04)
    h.Draw()
    
    label = ROOT.TLatex(); label.SetNDC(True)
    label.DrawLatex(0.175, 0.740, "#eta")
    label.DrawLatex(0.205, 0.775, "#rho,#omega")
    label.DrawLatex(0.270, 0.740, "#phi")
    label.DrawLatex(0.400, 0.800, "J/#psi")
    label.DrawLatex(0.415, 0.670, "#psi'")
    label.DrawLatex(0.485, 0.700, "Y(1,2,3S)")
    label.DrawLatex(0.755, 0.680, "Z")
    label.SetTextSize(0.040); label.DrawLatex(0.100, 0.920, "#bf{CMS Open Data}")
    label.SetTextSize(0.030); label.DrawLatex(0.630, 0.920, "#sqrt{s} = 8 TeV, L_{int} = 11.6 fb^{-1}")
    
    c.SaveAs("dimuon_spectrum.pdf")

In [None]:
# Create dataframe from NanoAOD files
filenames = ["root://eospublic.cern.ch//eos/opendata/cms/derived-data/AOD2NanoAODOutreachTool/Run2012BC_DoubleMuParked_Muons.root"]
#filenames = filenames * 4
treename = "Events"
df = RDataFrame(treename, filenames, oscarclient=oscarclient, npartitions=8)

In [None]:
dimuon_analysis(df)

In [None]:
# Plot generation
import csv
import sys
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

#df = pd.read_csv('fe511af1-a485-48eb-89fe-9067c2bb457f_usage.csv' , delimiter='|')
df = pd.read_csv('d132ba4a-8b6b-4916-b8d0-398d00a193ab_usage.csv' , delimiter='|')
df.head()

In [None]:
mappers = df[df['function'] == 'mapper']

cpu_figs = make_subplots(rows=2, cols=2, 
                         subplot_titles = ['Mapper 0','Mapper 1','Mapper 2','Mapper 3'],
                         x_title='Time (s)', y_title='CPU Usage (%)')
        
mem_figs = make_subplots(rows=2, cols=2, 
                         subplot_titles = ['Mapper 0','Mapper 1','Mapper 2','Mapper 3'],
                         x_title='Time (s)', y_title='Memory Usage (%)')
row = 1
col = 1


for id in ['0_0', '1_1', '2_2', '3_3']:
    mapper_data = mappers[mappers['id'] == id]
    
    cpu_figs.add_trace(
        go.Scatter(x=mapper_data['time'],
                   y=mapper_data['cpu_percent'],
                   showlegend=False),
        row=row, col=col
    )
    
    mem_figs.add_trace(
        go.Scatter(x=mapper_data['time'],
                   y=mapper_data['mem_percent'],
                   showlegend=False),
        row=row, col=col
    )
    
    col += 1
    if col > 2:
        col = 1
        row +=1

In [None]:
cpu_figs.show()

In [None]:
mem_figs.show()

In [None]:
df2 = pd.read_csv('fe511af1-a485-48eb-89fe-9067c2bb457f_process.csv' , delimiter='|')
df2.head()

In [None]:
ids = ['function', 'id', 'phase'] 
mem_cols = ids + [col for col in df2 if col.startswith('mem_')]
df2_mem = df2[mem_cols]
df2_mem

In [None]:
net_io_cols = ids + [ col for col in df2 if col.startswith('net_')]
df2_net_io = df2[net_io_cols]
df2_net_io.head()

In [None]:
io_cols = ids + [col for col in df2 if col.startswith('io_')]
df2_io = df2[io_cols]
df2_io.head()

In [None]:
ctx_cols = ids + [col for col in df2 if col.startswith('ctx_')]
df2_ctx = df2[ctx_cols]
df2_ctx.head()

## Testing with dataset in local minio.

In [1]:
def dimuon_analysis(df, node_count):
    # For simplicity, select only events with exactly two muons and require opposite charge
    df_2mu = df.Filter("nMuon == 2", "Events with exactly two muons")
    df_os = df_2mu.Filter("Muon_charge[0] != Muon_charge[1]", "Muons with opposite charge")
    
    # Compute invariant mass of the dimuon system
    df_mass = df_os.Define("Dimuon_mass", "InvariantMass(Muon_pt, Muon_eta, Muon_phi, Muon_mass)")
    
    # Make histogram of dimuon mass spectrum. Note how we can set titles and axis labels in one go.
    h = df_mass.Histo1D(("Dimuon_mass", "Dimuon mass;m_{#mu#mu} (GeV);N_{Events}", 30000, 0.25, 300), "Dimuon_mass")
    
    # Produce plot
    ROOT.gStyle.SetOptStat(0); ROOT.gStyle.SetTextFont(42)
    c = ROOT.TCanvas("c", "", 800, 700)
    c.SetLogx(); c.SetLogy()
    
    watch = ROOT.TStopwatch()
    h.SetTitle("")
    print(f"Time elapsed {watch.RealTime()}")
    h.GetXaxis().SetTitleSize(0.04)
    h.GetYaxis().SetTitleSize(0.04)
    h.Draw()
    
    label = ROOT.TLatex(); label.SetNDC(True)
    label.DrawLatex(0.175, 0.740, "#eta")
    label.DrawLatex(0.205, 0.775, "#rho,#omega")
    label.DrawLatex(0.270, 0.740, "#phi")
    label.DrawLatex(0.400, 0.800, "J/#psi")
    label.DrawLatex(0.415, 0.670, "#psi'")
    label.DrawLatex(0.485, 0.700, "Y(1,2,3S)")
    label.DrawLatex(0.755, 0.680, "Z")
    label.SetTextSize(0.040); label.DrawLatex(0.100, 0.920, "#bf{CMS Open Data}")
    label.SetTextSize(0.030); label.DrawLatex(0.630, 0.920, "#sqrt{s} = 8 TeV, L_{int} = 11.6 fb^{-1}")
    
    c.SaveAs(f"dimuon_spectrum_{node_count}.pdf")
    
def cpubound(df, folder):
    # Decide parameters of the random distributions of the RDF columns
    gaus_mean = 10
    gaus_sigma = 1
    exp_tau = 20
    poisson_mean = 30

    df_withcols = df.Define("x",f"gRandom->Gaus({gaus_mean},{gaus_sigma})")\
                    .Define("y",f"gRandom->Exp({exp_tau})")\
                    .Define("z",f"gRandom->PoissonD({poisson_mean})")

    # Decide how many operations per column you want to run
    # Increasing this would increase the overall runtime
    nops_percol = 10
    oplist = [df_withcols.Mean(f"{colname}") for colname in ["x","y","z"] for _ in range(nops_percol)]

    # Start a stopwatch and trigger the execution of the computation graph.
    # Asking for the first value in the list is enough to trigger everything
    print("Starting the CPU bound benchmark.")
    t = ROOT.TStopwatch()
    first_value = oplist[0].GetValue()
    realtime = round(t.RealTime(), 2)
    print(f"CPU bound benchmark finished in {realtime} seconds.")

    # Decide the name of the output csv to store runtime information.
    outcsv = f"{folder}/distrdf_cpubound_{realtime}.csv"
    
    with open(outcsv, "a+") as f:
        f.write(str(realtime))
        f.write("\n")


In [2]:
import ROOT
import os

# Plot generation
import csv
import sys
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

oscarclient = {
    "minio_endpoint": os.environ['minio_endpoint'],
    "minio_access":   os.environ['minio_access'],
    "minio_secret":   os.environ['minio_secret'],
    "bucket_name": 'root-oscar',
    # Optional arguments
    "benchmarking" : True,
    "oscar_endpoint": f"{os.environ['oscar_endpoint']}",
    "oscar_access":   os.environ['oscar_access'],
    "oscar_secret":   os.environ['oscar_secret']
}


minio_data = 'https://158.42.106.12:30300/root-common/dimuon_data.root'
#aws_data = 'https://test-cern-data.s3.amazonaws.com/dimuon_data.root'
cern_data = "root://eospublic.cern.ch//eos/opendata/cms/derived-data/AOD2NanoAODOutreachTool/Run2012BC_DoubleMuParked_Muons.root"

filenames = [cern_data] * 100
treename = "Events"
RDataFrame = ROOT.RDF.Experimental.Distributed.OSCAR.RDataFrame
#RDataFrame = ROOT.RDataFrame

Welcome to JupyROOT 6.27/01


In [3]:
nentries = int(1e9)

experiment_name = 'cpu_bound_0.98'
os.mkdir(experiment_name)
oscarclient['folder'] = experiment_name


for node_count in [16, 8, 4, 2, 1]:
    oscarclient['mapper_count'] = node_count
    #df = RDataFrame(treename, filenames, 
                    #oscarclient= oscarclient | {'node_count': node_count} , 
                    #npartitions=node_count)
    #dimuon_analysis(df,node_count)
    df = RDataFrame(nentries,oscarclient=oscarclient, npartitions=node_count)
    cpubound(df, experiment_name)

root-oscar-1dc3149c-7991-4b98-af3b-d9a52e99d293-benchmark
Bucket does not exist. Trying to create it.
Creating bucket...
Bucket created!
Creating services...
Creating service mapper for root-oscar-1dc3149c-7991-4b98-af3b-d9a52e99d293-benchmark
root-oscar-1dc3149c-7991-4b98-af3b-d9a52e99d293-benchmark
CPU = 0.98
Creating service reducer for root-oscar-1dc3149c-7991-4b98-af3b-d9a52e99d293-benchmark
root-oscar-1dc3149c-7991-4b98-af3b-d9a52e99d293-benchmark
CPU = 0.98
Done creating services!
Starting the CPU bound benchmark.
['0_0-1_1', '2_2-3_3', '4_4-5_5', '6_6-7_7', '8_8-9_9', '10_10-11_11', '12_12-13_13', '14_14-15_15', '0_1-2_3', '4_5-6_7', '8_9-10_11', '12_13-14_15', '0_3-4_7', '8_11-12_15', '0_7-8_15']
root-oscar-1dc3149c-7991-4b98-af3b-d9a52e99d293-benchmark
<Response [201]>
<Response [201]>
0   0   62500000
1   62500000   125000000
2   125000000   187500000
3   187500000   250000000
4   250000000   312500000
5   312500000   375000000
6   375000000   437500000
7   437500000   50000

MaxRetryError: HTTPSConnectionPool(host='158.42.106.12', port=30300): Max retries exceeded with url: http://158.42.106.12:30300/root-oscar-fd278d41-8a2c-486c-a384-541668b0a1c1-benchmark/partial-results/0_0 (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x40bb1e4580>: Failed to establish a new connection: [Errno 111] Connection refused')))

In [4]:
print(str(count_uuid))

root-oscar-264285c4-098b-433f-90ae-e6c91eb68716-benchmark
Bucket does not exist. Trying to create it.
Creating bucket...
Bucket created!
Creating services...
Creating service mapper for root-oscar-264285c4-098b-433f-90ae-e6c91eb68716-benchmark
root-oscar-264285c4-098b-433f-90ae-e6c91eb68716-benchmark
Creating service reducer for root-oscar-264285c4-098b-433f-90ae-e6c91eb68716-benchmark
root-oscar-264285c4-098b-433f-90ae-e6c91eb68716-benchmark
Done creating services!
Client headnode
Starting the CPU bound benchmark.
['0_0-1_1', '2_2-3_3', '4_4-5_5', '6_6-7_7', '8_8-9_9', '10_10-11_11', '12_12-13_13', '14_14-15_15', '0_1-2_3', '4_5-6_7', '8_9-10_11', '12_13-14_15', '0_3-4_7', '8_11-12_15', '0_7-8_15']
root-oscar-264285c4-098b-433f-90ae-e6c91eb68716-benchmark
<Response [201]>
<Response [201]>
Target Name: 0_15
Waiting for final result 0_15.
264285c4-098b-433f-90ae-e6c91eb68716_process.csv
264285c4-098b-433f-90ae-e6c91eb68716_usage.csv
CPU bound benchmark finished in 158.09 seconds.



In [22]:
# Get file names 
file_names = []
for line in str(file_names1).splitlines():
    if type(line) is str:
        if line.endswith('.csv'):
            file_names += [line]
            
# Change names. 



['48b30b4f-224b-4fde-b439-fa074b9d2f3c_process.csv', '48b30b4f-224b-4fde-b439-fa074b9d2f3c_usage.csv']


In [27]:
%%capture test
for node_count in [1,2,3]:
    print(node_count)

In [28]:
print(str(test))

1
2
3



# Plotting

In [None]:
df = pd.read_csv('MinioData/96_usage.csv' , delimiter='|')

mappers = df[df['function'] == 'mapper']

cpu_figs = make_subplots(rows=2, cols=2, 
                         subplot_titles = ['Mapper 0','Mapper 1','Mapper 2','Mapper 3'],
                         x_title='Time (s)', y_title='CPU Usage (%)')
        
mem_figs = make_subplots(rows=2, cols=2, 
                         subplot_titles = ['Mapper 0','Mapper 1','Mapper 2','Mapper 3'],
                         x_title='Time (s)', y_title='Memory Usage (%)')
row = 1
col = 1


for id in ['0_0', '1_1', '2_2', '3_3']:
    mapper_data = mappers[mappers['id'] == id]
    
    cpu_figs.add_trace(
        go.Scatter(x=mapper_data['time'],
                   y=mapper_data['cpu_percent'],
                   showlegend=False),
        row=row, col=col
    )
    
    mem_figs.add_trace(
        go.Scatter(x=mapper_data['time'],
                   y=mapper_data['mem_percent'],
                   showlegend=False),
        row=row, col=col
    )
    
    col += 1
    if col > 2:
        col = 1
        row +=1
cpu_figs.show()
mem_figs.show()

In [None]:
df = pd.read_csv('48b30b4f-224b-4fde-b439-fa074b9d2f3c_process.csv' , delimiter='|')

mapper = df[df['function'] == 'mapper']
mem_cols = ['function', 'id', 'phase'] +  [col for col in mapper if col.startswith('mem_')]
mapper_mem = mapper[mem_cols]
mapper_mem

In [None]:
df2 = df = pd.read_csv('11768b06-1565-47cf-9b31-e8eff20b9d03_usage.csv' , delimiter='|')
usage = df2[df2['function'] == 'mapper']
usage

In [27]:
%%capture dsa
print('asd')
print('dsa')

In [28]:
str(dsa).splitlines()

['asd', 'dsa']