In [None]:
import ROOT
import requests
import findspark
findspark.init('/usr/local/spark')
from pyspark import SparkConf, SparkContext

In [None]:
# check some of the env variables
!env | grep -i spark

In [None]:
buck = 'https://cloud-areapd.pd.infn.it:5210/swift/v1/AUTH_d2e941ce4b324467b6b3d467a923a9bc/LCPmodB-Y3_CMS_FCNC/'

local = '/data/FCNC/'

tofile = 'SingleMuon_Run2016B_ver1-Nano1June2019_ver1-v1/4F32E8E2-3E72-E84D-A12B-CA4CB942D65C_Skim.root'

#### Trials with ROOT documentation

In [None]:
#import boto3

#s3 = boto3.resource('s3')
#bucket = s3.Bucket(buck)
#cl = boto3.client('s3')
#response = cl.list_objects(Bucket=buck)

#bucket.objects()

In [None]:
# Create a SparkContext object with the right configuration for your Spark cluster
conf = SparkConf().setAppName('appName').setMaster('spark://10.67.22.59:7077')
sc = SparkContext(conf=conf)

In [None]:
file_list_raw = requests.get(buck, verify=False)
file_list = file_list_raw.text.split('\n')

In [None]:
for i in range(len(file_list)):
    file_list[i] = 's3' + buck + file_list[i]
    
chain = ROOT.TChain('Events')
for file in file_list:
    chain.AddFile(file)

In [None]:
# Point RDataFrame calls to the Spark specific RDataFrame
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
 
# The Spark RDataFrame constructor accepts an optional "sparkcontext" parameter
# and it will distribute the application to the connected cluster
df = RDataFrame(chain, sparkcontext = sc)

In [None]:
df = df.Filter('nMuon > 5')

In [None]:
df.Count().GetValue()

In [None]:
df = ROOT.RDataFrame('Events', file_list)

In [None]:
df

In [None]:
# why doesnt it work?
sums = df.Filter("nMuon > 2").Sum("nElectron")
h = df.Histo1D("nMuon")
 
print(sums.GetValue())
h.Draw()

#### Trials with DIANA-HEP

In [None]:
from pyspark.sql import SparkSession
# from pyspark.sql import SQLContext

In [None]:
spark = SparkSession.builder \
        .master('spark://10.67.22.59:7077') \
        .appName('3rd') \
        .config('spark.jars.packages',
                'org.diana-hep:spark-root_2.11:0.1.13,org.diana-hep:histogrammar-sparksql_2.11:1.0.4,org.apache.hadoop:hadoop-aws:2.7.0',)\
        .config('spark.cores.max',3)\
        .getOrCreate()

spark

In [None]:
# get spark context -> entry point used to work with RDD
sc = spark.sparkContext
sc

In [None]:
import os

df = spark.read.format('org.dianahep.sparkroot').load('hdfs:///root' + tofile)

print(df.count())

In [None]:
sqlContext = SQLContext(sc)

sqlContext.read.format('org.dianahep.sparkroot').option('tree', 'Events')\
.load('file:'+tofile)

In [None]:
!$SPARK_HOME/bin/pyspark --master spark://ip-10.67.22.59:7077 --packages org.apache.hadoop:hadoop-aws:2.7.0

In [None]:
#Read the ROOT file into a Spark DataFrame
df = spark.read\
    .format('org.dianahep.sparkroot')\
    .load('s3://' + buck + tofile)
# ... and print the number of events
# print df.count()

#### Verifying spark

In [None]:
# python dataset
data = [1,2,3,4,5,6,7,8]

# parallelize
sc.parallelize(data).count()

In [None]:
import random

def inside(p):     
    x, y = random.random(), random.random()
    return x*x + y*y < 1

num_samples = 1000000
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4.0 * count / num_samples
print(pi)

#### Stop cluster

In [None]:
sc.stop()