In [None]:
from pyspark.sql import SparkSession

#créer une session dans le master
spark = SparkSession.builder \
    .master("spark://172.20.53.96:7077") \
    .appName("WDC-complete") \
    .config("spark.executor.memory","28g") \
    .config("spark.driver.memory","28g") \
    .getOrCreate()
# spark = SparkSession.builder.master("local").appName("WDC-complete").getOrCreate()

spark.conf.set("spark.worker.cleanup.enabled",True)
spark.conf.set("spark.worker.cleanup.interval",1800)
spark.conf.set("spark.worker.cleanup.appDataTtl",3600)
spark.conf.set("spark.sql.shuffle.partitions",1000)

#fichiers de config qui permettent de se connecter au serveur de stockage s3 qui contient les fichiers de DataCommons
endpoint_url = 'https://s3.os-bird.glicid.fr/'
aws_access_key_id = 'bbd95ea3c1174caa88345404b84e458f'
aws_secret_access_key = 'eaf2a72ecf9845f583af7f3513c44f25'
hadoopConf = spark._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3a.access.key', aws_access_key_id)
hadoopConf.set('fs.s3a.secret.key', aws_secret_access_key)
hadoopConf.set('fs.s3a.endpoint', endpoint_url)
hadoopConf.set('fs.s3a.path.style.access', 'true')
hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

hadoopConf.set('spark.worker.cleanup.enabled', 'true')
hadoopConf.set('fs.s3a.committer.name', 'magic')

In [None]:
from pyspark.sql import functions as f

from pyspark.sql.functions import split
from pyspark.sql.functions import col

readavant = spark.read.option("header",True) \
  .csv("s3a://test-out/wdcfix/**")

csavant=readavant.groupby("pset").agg(f.sum("count").alias('count')).sort(f.desc("count"))

csavant.createOrReplaceTempView("CSET_avant")
csavant.show(truncate=150)

readapres = spark.read.option("header",True) \
  .csv("s3a://test-out/cset-wdc-2023-fix2/**")

csapres=readapres.groupby("pset").agg(f.sum("count").alias('count')).sort(f.desc("count"))

# fix wrong formatting
csapres = csapres.withColumn("pset", f.regexp_replace(f.col("pset"), "([Hh][Tt][Tt][Pp][Ss]?://)?([Ww]{3}\.)?", ""))

csapres.createOrReplaceTempView("CSET_apres")
csapres.show(truncate=150)

In [None]:
# calcule le count, l'average et le coverage d'un type de données
def calculate_countavcov(data, pred):
    
    
    # Danger injection SQL
    pred = pred.replace("'", "\\'")
    sets = spark.sql(f"SELECT pset, count FROM {data} WHERE pset LIKE '%{pred}%'")
    sets = sets.withColumn("pset", split(sets["pset"], " "))
    
    count_sum = sets.agg({"count": "sum"}).collect()[0][0]
    count_used = sets.selectExpr("sum(size(pset) * count) as count_used").collect()[0][0]
    
    if count_sum is None or count_used is None : 
        return Row(type=pred, count=float(0), average=float(0), coverage=float(0))

    distinct_predicate_count = sets.selectExpr("explode(pset) as predicate").distinct().count()
    
    average = count_used / count_sum
    coverage = count_used / (count_sum * distinct_predicate_count)
    
    
    print(f" {pred}:  ; count = {count_sum} ; average = {average} ; coverage = {coverage}")
    return Row(type=pred, count=float(count_sum), average=float(average), coverage=float(coverage))

In [None]:
from pyspark.sql import Row
import json

rowsavant = []
rowsapres = []

# lit dans le JSON possédant les noms des types de schema.org
with open('schemaTypes.json', 'r') as schema_type_json:
    # Chargement du contenu du fichier JSON dans une liste
    type_name_list = json.load(schema_type_json)

print(type_name_list)

for pred in type_name_list:
    print(pred)
    rowsavant.append(calculate_countavcov("CSET_avant", pred))
    rowsapres.append(calculate_countavcov("CSET_apres", pred))

    #rowsavant.append(Row(type=pred, average=float(calculate_average("CSET_avant", pred)), coverage=float(calculate_coverage("CSET_avant", pred))) )
    #print(rowsavant)

dfavant = spark.createDataFrame(rowsavant)
dfavant.createOrReplaceTempView("avcovavant")
dfavant.show()

dfapres = spark.createDataFrame(rowsapres)
dfapres.createOrReplaceTempView("avcovapres")
dfapres.show()



In [None]:
spark.sql("select * from avcovavant order by average DESC").show(truncate=0)
spark.sql("select * from avcovapres order by average DESC").show(truncate=0)

spark.sql("select * from avcovavant order by coverage DESC").show(truncate=0)
spark.sql("select * from avcovapres order by coverage DESC").show(truncate=0)

In [None]:
covavevolution = spark.sql("""
SELECT
    COALESCE(avcovavant.type, avcovapres.type) AS type,
    avcovavant.count AS count_before,
    avcovapres.count AS count_after,
    CASE
        WHEN avcovavant.type IS NULL THEN "This type is only in the after graph"  -- Type is only in avcovapres
        WHEN avcovapres.type IS NULL THEN "This type is only in the before graph"  -- Type is only in avcovavant
        ELSE (avcovapres.count - avcovavant.count) / avcovavant.count * 100.0
    END AS percentage_count_evolution,
    avcovavant.average AS average_before,
    avcovapres.average AS average_after,
    CASE
        WHEN avcovavant.type IS NULL THEN "This type is only in the after graph"  -- Type is only in avcovapres
        WHEN avcovapres.type IS NULL THEN "This type is only in the before graph"  -- Type is only in avcovavant
        ELSE (avcovapres.average - avcovavant.average) / avcovavant.average * 100.0
    END AS percentage_average_evolution,
    avcovavant.coverage AS coverage_before,
    avcovapres.coverage AS coverage_after,
    CASE
        WHEN avcovavant.type IS NULL THEN "This type is only in the after graph"  -- Type is only in avcovapres
        WHEN avcovapres.type IS NULL THEN "This type is only in the before graph"  -- Type is only in avcovavant
        ELSE (avcovapres.coverage - avcovavant.coverage) / avcovavant.coverage * 100.0
    END AS percentage_coverage_evolution
FROM
    avcovavant
FULL OUTER JOIN
    avcovapres
ON
    avcovavant.type = avcovapres.type
""")
covavevolution.show(truncate=110)
covavevolution.createOrReplaceTempView("covavevolution")

In [None]:
# SAUVEGARDE ICI LES RESULTATS DE COVAVEVOLUTION DANS UN CSV
covavevolution.write.option("header",True).mode("overwrite").csv(f"s3a://test-out/types/covavevolutionresult")

read = spark.read.option("header", True).csv(f"s3a://test-out/types/covavevolutionresult")

read.show(truncate=110)


In [None]:
covavevolutionOrder = spark.sql("""
    SELECT
        *,
        (CAST(percentage_average_evolution AS DOUBLE) / MAX(CAST(percentage_average_evolution AS DOUBLE)) OVER ()) +
        (CAST(percentage_coverage_evolution AS DOUBLE) / MAX(CAST(percentage_coverage_evolution AS DOUBLE)) OVER ()) AS combined_distance
    FROM
        covavevolution
    ORDER BY combined_distance DESC
""")
covavevolutionOrder.show(truncate=110)
covavevolutionOrder.createOrReplaceTempView("covavevolutionOrder")

In [None]:
spark.sql("""
    SELECT type, percentage_average_evolution, percentage_coverage_evolution
    FROM covavevolutionOrder
""").show(truncate=0)