# Exemple pour une journée, avec l'API

In [1]:
# Pointer vers un index et un type.
# Cela crée un DataFrame
acces20160831 = sqlContext.read.format("org.elasticsearch.spark.sql").load("ul-ena-pr-2016.08.31/acces")

In [2]:
# Supprimer la structure geoip car cause problèmes (à investiguer ultérieurement)
# Probablement le type 'ip'?
acces20160831 = acces20160831.drop('geoip')

In [3]:
#  Rendre accessible le DataFrame à l'engin SQL
acces20160831.registerTempTable("acces20160831")

In [4]:
# Si on désire des performances extrêmes, on peut mettre en cache le contenu de ce DataFrame/Table
# La mise en cache est lazy et s'effectue à la première «action» qui sera lente, mais après cela, tout deviendrait rapide!
acces20160831.cache()

DataFrame[@timestamp: timestamp, @version: string, agent: string, beat: struct<hostname:string,name:string>, code_retour: string, commun: struct<code_utilisateur:string,ip_dst_ipv4:string,ip_src:string,ip_src_ipv4:string>, host: string, id_requete: string, input_type: string, message: string, methode_http: string, offset: bigint, port: string, rvl: struct<client:string,index:string,indexer:string,proxy:string>, source: string, tags: array<string>, temps_reponse: bigint, type: string, ul: struct<environnement:string,numero_serveur:string,role_serveur:string,serveur:string,sigle_systeme:string>, url: string, wls: struct<application:string,domaine:string,managed_server:string>]

# Générer une table temporaire par jour

In [5]:
# Directement dans le SQL Context avec une opération de CREATE TEMPORARY TABLE
# On utilise une boucle
for i in range(1,10):        
    i = str(i).zfill(2)
    sqlCtx.sql(
       "CREATE TEMPORARY TABLE acces201609" + i +  
       " USING org.elasticsearch.spark.sql " +  
        "OPTIONS (resource 'ul-ena-pr-2016.09." + i + "/acces', scroll_size '100')" )

In [6]:
# Lister les tables créées
sqlCtx.tableNames()

[u'acces20160831',
 u'acces20160901',
 u'acces20160902',
 u'acces20160903',
 u'acces20160904',
 u'acces20160905',
 u'acces20160906',
 u'acces20160907',
 u'acces20160908',
 u'acces20160909']

# Opérations de base

In [11]:
# Aller chercher les valeurs distinctes pour un champ/objet
%time acces20160831.select("wls").distinct().collect()

CPU times: user 3.6 ms, sys: 3.04 ms, total: 6.64 ms
Wall time: 1.48 s


[Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod02-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod04-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod03-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod01-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod01-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod05-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod04-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod03-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod05-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod02-b'))]

In [12]:
# Faire l'équivalent en SQL
%time sqlCtx.sql("select distinct wls from acces20160831").collect()

CPU times: user 9.18 ms, sys: 2.05 ms, total: 11.2 ms
Wall time: 1.51 s


[Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod02-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod04-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod03-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod01-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod01-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod05-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod04-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod03-a')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod05-b')),
 Row(wls=Row(application=u'ena', domaine=u'ena-prod', managed_server=u'ena-server-prod02-b'))]

In [14]:
# Nomre d'accès par utilisateur
%time sqlCtx.sql("select commun.code_utilisateur, count(*) as nb from acces20160831 where commun.code_utilisateur is not null group by commun.code_utilisateur order by nb desc limit 10").collect()

CPU times: user 2.88 ms, sys: 1.35 ms, total: 4.23 ms
Wall time: 2.42 s


[Row(code_utilisateur=u'MP-COM', nb=14645),
 Row(code_utilisateur=u'DEPER', nb=5698),
 Row(code_utilisateur=u'LOAUD27', nb=4698),
 Row(code_utilisateur=u'MFLAL', nb=4697),
 Row(code_utilisateur=u'MCBEL', nb=4187),
 Row(code_utilisateur=u'MILAB', nb=4115),
 Row(code_utilisateur=u'FRHUB6', nb=3541),
 Row(code_utilisateur=u'JUBER229', nb=3534),
 Row(code_utilisateur=u'MDHAM1', nb=3520),
 Row(code_utilisateur=u'NACOT29', nb=3506)]

In [71]:
# Essayer avec une journée qui n'est pas en cache
%time sqlCtx.sql("select commun.code_utilisateur, count(*) as nb from acces20160908 where commun.code_utilisateur is not null group by commun.code_utilisateur order by nb desc limit 10").collect()

CPU times: user 64.9 ms, sys: 18.3 ms, total: 83.2 ms
Wall time: 7min 45s


[Row(code_utilisateur=u'MP-COM', nb=12797),
 Row(code_utilisateur=u'FRGRE102', nb=6198),
 Row(code_utilisateur=u'EMJON2', nb=6037),
 Row(code_utilisateur=u'AAGIG4', nb=5637),
 Row(code_utilisateur=u'ANGRO33', nb=5612),
 Row(code_utilisateur=u'LISIM54', nb=5387),
 Row(code_utilisateur=u'MASAU91', nb=5301),
 Row(code_utilisateur=u'JAEAV', nb=5068),
 Row(code_utilisateur=u'CALAL43', nb=4783),
 Row(code_utilisateur=u'MYLAR37', nb=4642)]