* Master DAC, BDLE, 2021 
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr

# Querying nested data 

This lab session complements the previous session with several concepts:
* manipulating irregular, nested data represented in JSON format
* using windowing functions, multi-dimensional aggregations, and pivot tables



```
# This is formatted as code
```

## Préparation

Vérifier que des ressources de calcul sont allouées à votre notebook est connecté (cf RAM  de disque indiqués en haut à droite) . Sinon cliquer sur le bouton connecter pour obtenir des ressources.




Pour accéder directement aux fichiers stockées sur votre google drive. Renseigner le code d'authentification lorsqu'il est demandé

Ajuster le nom de votre dossier : MyDrive/ens/bdle/dir. **Remplacer dir **

In [None]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/ens/bdle/SparkDF/"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


['books.csv', 'ratings.csv', 'users.csv', 'vk_001.json']

Installer pyspark et findspark :


In [None]:
!pip install -q pyspark
!pip install -q findspark

[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[K     |████████████████████████████████| 198 kB 76.3 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Démarrer la session spark

In [None]:
import os
# !find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

In [None]:
# Principaux import
import findspark
from pyspark.sql import SparkSession 
from pyspark import SparkConf  

# pour les dataframe et udf
from pyspark.sql import *  
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark 
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")
  
  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  
  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")    
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

session démarrée, son id est  local-1636728713757


In [None]:
# on utilise 8 partitions au lieu de 200 par défaut
spark.conf.set("spark.sql.shuffle.partitions", "8")
print("Nombre de partitions utilisées : ", spark.conf.get("spark.sql.shuffle.partitions"))

Nombre de partitions utilisées :  8


## Data loading

In [None]:
# URL du dossier PUBLIC_DATASET contenant des fichiers de données pour les TP
# ---------------------------------------------------------------------------
# en cas de problème avec le téléchargement des datasets, aller directement sur l'URL ci-dessous
PUBLIC_DATASET_URL = "https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4" 
PUBLIC_DATASET=PUBLIC_DATASET_URL + "/download?path="

print("URL pour les datasets ", PUBLIC_DATASET_URL)

URL pour les datasets  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4


In [None]:
import os
from urllib import request

def load_file(file,dir):
  if(os.path.isfile(file)):
    print(file, "is already stored")
  else:
    url = PUBLIC_DATASET + "/"+ dir + "/" + file
    print("downloading from URL: ", url, "save in : " + drive_dir   + file)
    request.urlretrieve(url , drive_dir + file)

load_file("vk_001.json", "VKRU18")

# Liste des fichiers de IMDB
os.listdir(drive_dir)

downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/VKRU18/vk_001.json save in : /content/drive/MyDrive/ens/bdle/SparkDF/vk_001.json


['books.csv', 'ratings.csv', 'users.csv', 'vk_001.json']

## Dataset loading
The dataset used in this lab session describes posts exchanged in a Russian social network, called VK, during the 2018 Russian elections.
The schema of this dataset is described in the official API website
https://vk.com/dev/streaming_api_docs_2?f=7.%2BЧтение%2Bпотока 


We will use this dataset to perform some analytics on the type of posts, the tags used in these post and the relationships between authors of the posts.

In [None]:
data = spark.read.format("json").load(drive_dir+'vk_001.json').distinct()
data.count()

30683

In [None]:
#examine the schema
data.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- code: long (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- attachments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- album: struct (nullable = true)
 |    |    |    |    |-- created: long (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- thumb: struct (nullable = true)
 |    |    |    |    |    |-- access_key: string (nullable = true)
 |    |    |    |    |    |-- album_id: long (nullable = true)
 |    |    |    |    |    |-- date: long (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 | 

## Questions

The schema indicates each attribute with nullable=true which is not very informative since we can not know whether a field has missing values.
The following instructions count the number of rows w/o missing values for some specific fields of interest.

In [None]:
att_list = ["event", 
                 "event.event_id", 
                 "event.event_id.post_id", 
                 "event.event_id.post_owner_id", 
                 "event.event_id.comment_id", 
                 "event.event_id.shared_post_id", 
                 "event.author", 
                 "event.attachments", 
                 "event.geo", 
                 "event.tags",
                 "event.creation_time"]

In [None]:
for att in att_list:
    print("count of %s=%s"%(att, data.where(att+ " is not null").count()))

count of event=30683
count of event.event_id=30683
count of event.event_id.post_id=30683
count of event.event_id.post_owner_id=30683
count of event.event_id.comment_id=16518
count of event.event_id.shared_post_id=638
count of event.author=30683
count of event.attachments=15944
count of event.geo=22
count of event.tags=30683
count of event.creation_time=30683


What do you observe? Which attributes have missing values?

Feel free to examine the content of these attributes (count the number of distinct values, examine a sample of distinct values, etc)

<FONT color="red"> We have missing values. "*event.event_id.comment*", "*event.event_id.shared_post_id*", "*event.attachments*" and "*event.geo*".</FONT>

The official documentation (https://vk.com/dev/streaming_api_docs_2?f=7.%2BЧтение%2Bпотока) reports on some assumptions about the presence/absence of values.
For example, the field `event.event_id.comment_id` is present only when `event.event_type='comment'` and `event.event_id.shared_post_id` is present only when `event.event_type='share'`.

Write two queries to verify these assumptions.

#### Check that `event.event_id.comment_id` is present only when `event.event_type='comment'`

In [None]:
data.filter((col('event.event_type') != 'comment') & (col('event.event_id.comment_id').isNotNull())).count() == 0

True

#### Check that `event.event_id.shared_post_id` is present only when `event.event_type='share'`

In [None]:
data.filter((col('event.event_type') != 'share') & (col('event.event_id.shared_post_id').isNotNull())).count() == 0

True

#### How many distinct post ids are there?

In [None]:
from pyspark.sql.functions import count, countDistinct

In [None]:
nb_posts = data.select(countDistinct(col="event.event_id.post_id"))
nb_posts.show()

+--------------------------------------+
|count(DISTINCT event.event_id.post_id)|
+--------------------------------------+
|                                 21683|
+--------------------------------------+



#### How many distinct post_ids per event type

In [None]:
post_per_type = data.select("event.event_type","event.event_id.post_id").distinct().groupBy("event_type").count().select(col("event_type"),col("count").alias("count_objects"))
post_per_type.show()

+----------+-------------+
|event_type|count_objects|
+----------+-------------+
|     share|          544|
|      post|         8137|
|   comment|        14202|
+----------+-------------+



In [None]:
post_per_type = data.groupBy("event.event_type").agg(countDistinct(col="event.event_id.post_id").alias("count_object"))
post_per_type.show()

+----------+------------+
|event_type|count_object|
+----------+------------+
|     share|         544|
|      post|        8137|
|   comment|       14202|
+----------+------------+



#### Flattening lists of tags

In `data`, each object is associated with an array of tags accessed from `event.tags` (see the schema). Write an instruction to add a `tag` column containing a single tag obtained by flattening the `tags` array

In [None]:
from pyspark.sql.functions import explode

In [None]:
att_list = ["event", 
                 "event.event_id", 
                 "event.event_id.post_id", 
                 "event.event_id.post_owner_id", 
                 "event.event_id.comment_id", 
                 "event.event_id.shared_post_id", 
                 "event.author", 
                 "event.attachments", 
                 "event.geo", 
                 "event.tags",
                 "event.creation_time"]

In [None]:
data_with_tags = data.select("_id","code","event",explode("event.tags").alias("tag"))
data_with_tags.show()

+--------------------+----+--------------------+---------+
|                 _id|code|               event|      tag|
+--------------------+----+--------------------+---------+
|{5a66276e7f254c35...| 100|{new, null, {http...| grudinin|
|{5a66296f7f254c35...| 100|{new, null, {http...|    putin|
|{5a6629907f254c35...| 100|{new, null, {http...|  sobchak|
|{5a662a137f254c35...| 100|{new, [{null, nul...|    putin|
|{5a662b387f254c35...| 100|{new, [{null, nul...| grudinin|
|{5a662dadd81e7c3b...| 100|{new, [{null, nul...|    putin|
|{5a662e5cd81e7c3b...| 100|{new, [{null, nul...|yavlinsky|
|{5a662fe3d81e7c3b...| 100|{new, [{null, nul...| grudinin|
|{5a6631c5d81e7c3b...| 100|{new, [{null, nul...|  navalny|
|{5a663292d81e7c3b...| 100|{new, null, {http...| grudinin|
|{5a663292d81e7c3b...| 100|{new, null, {http...|    putin|
|{5a6632bad81e7c3b...| 100|{new, [{null, nul...|  navalny|
|{5a663449d81e7c3b...| 100|{new, null, {http...|    putin|
|{5a68c389713e4d08...| 100|{new, [{null, nul...| grudini

#### Return the number of distinct post_id per tag. Sort in descending order of count

In [None]:
from pyspark.sql.functions import desc

objects_per_tag = data.select(explode("event.tags").alias("tag"),"event.event_id.post_id").groupBy("tag").agg(countDistinct("post_id").alias("nb_posts")).sort(col("nb_posts").desc())
objects_per_tag.show()

+-----------+--------+
|        tag|nb_posts|
+-----------+--------+
|      putin|   14859|
|   grudinin|    6222|
|    navalny|    2616|
|    sobchak|    2134|
|zhirinovsky|    1231|
|      titov|     577|
|  yavlinsky|     361|
+-----------+--------+



#### Return the number of distinct author.id per tag. Sort in descending order of count

In [None]:
authors_per_tag =  data.select(explode("event.tags").alias("tag"),"event.author.id").groupBy("tag").agg(countDistinct("id").alias("nb_authors")).sort(col("nb_authors").desc())

authors_per_tag.show()

+-----------+----------+
|        tag|nb_authors|
+-----------+----------+
|      putin|     15673|
|   grudinin|      6207|
|    navalny|      2580|
|    sobchak|      2288|
|zhirinovsky|      1214|
|      titov|       572|
|  yavlinsky|       347|
+-----------+----------+



#### Fact-checking using Wikipedia

Observe that each tag corresponds to a candidate of the RU2018 Elections (Putin, Titov, etc).
We would like to check the relationship between the count of tags per author and the number of votes associated to each candidate.
We collect, from Wikipedia, the number of votes per candidates and make it available though the dataset `votes` defined as follows.

In [None]:
from pyspark.sql.types import *

schema_votes = StructType([StructField("name", StringType()),\
                     StructField("party", StringType()),\
                     StructField("votes", LongType())])
                     

raw_votes = [("putin", "Independent", 56430712),\
             ("grudinin", "Communist",8659206), \
             ("zhirinovsky","Liberal Democratic Party",4154985),\
             ("sobchak","Civic Initiative",1238031),\
             ("yavlinsky","Yabloko",769644), \
             ("titov","Party of Growth",556801)\
            ]

votes = spark.createDataFrame(raw_votes,schema_votes) 
votes.show()

+-----------+--------------------+--------+
|       name|               party|   votes|
+-----------+--------------------+--------+
|      putin|         Independent|56430712|
|   grudinin|           Communist| 8659206|
|zhirinovsky|Liberal Democrati...| 4154985|
|    sobchak|    Civic Initiative| 1238031|
|  yavlinsky|             Yabloko|  769644|
|      titov|     Party of Growth|  556801|
+-----------+--------------------+--------+



#### Return for each candidate the number of its votes and the number of authors who tagged it. What do you observe?

In [None]:
votes_count =  votes.select("name","votes").join(authors_per_tag,votes.name == authors_per_tag.tag).select("name","votes","nb_authors").sort(desc("nb_authors"))
votes_count.show()

+-----------+--------+----------+
|       name|   votes|nb_authors|
+-----------+--------+----------+
|      putin|56430712|     15673|
|   grudinin| 8659206|      6207|
|    sobchak| 1238031|      2288|
|zhirinovsky| 4154985|      1214|
|      titov|  556801|       572|
|  yavlinsky|  769644|       347|
+-----------+--------+----------+



### Window function

Read about the windowing functions https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank


Define a window specification for the `votes` and `nb_authors` columns

In [None]:
windowSpecVotes = Window.orderBy(desc("votes")).rowsBetween(Window.unboundedPreceding,Window.currentRow)
windowSpecCount = Window.orderBy(desc("nb_authors")).rowsBetween(Window.unboundedPreceding,Window.currentRow)

Using the window specifications, augment `votes_count` with two attributes, `votes_rank` and `nbAuths_ranks`, which rank each candidate  based on the number of votes he received  and the number of authors who tagged a post with his name. 

Hint. The window spans the entire data and does not use any partitioning.

In [None]:
compare = votes_count.select("*",rank().over(windowSpecVotes).alias("votes_rank"),rank().over(windowSpecCount).alias("nb_authors_rank"))
compare.show()

+-----------+--------+----------+----------+---------------+
|       name|   votes|nb_authors|votes_rank|nb_authors_rank|
+-----------+--------+----------+----------+---------------+
|      putin|56430712|     15673|         1|              1|
|   grudinin| 8659206|      6207|         2|              2|
|    sobchak| 1238031|      2288|         4|              3|
|zhirinovsky| 4154985|      1214|         3|              4|
|      titov|  556801|       572|         6|              5|
|  yavlinsky|  769644|       347|         5|              6|
+-----------+--------+----------+----------+---------------+



### Multidimensional aggregation

We want to create a cube with three dimensions: `tag`, `event_type` and `month`. While the first two attributes are already available, the month column must be extracted from the `creation_time` attribute using a built-in function. 

Create a dataset `dataTagMon` obtained by augmenting `data_with_tags` with an attribute `month` containing the month extracted from the `event.creation_time` attribute.

Hint. You will notice that only months 1, 2 and 3 exist in the dataset.

In [None]:
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import month

In [None]:
data_tag_mon = data_with_tags.select("*",month(from_unixtime("event.creation_time")).alias("month"))

data_tag_mon.select("month").distinct().show()

+-----+
|month|
+-----+
|    1|
|    3|
|    2|
+-----+



#### For each combination of event_type, tag and month, count the nomber of distinct post_ids

In [None]:
tag_event_month = data_tag_mon.groupBy("event.event_type","tag","month").agg(countDistinct("event.event_id.post_id").alias("count_dist_postids"))
tag_event_month.orderBy(desc("count_dist_postids")).show()

+----------+--------+-----+------------------+
|event_type|     tag|month|count_dist_postids|
+----------+--------+-----+------------------+
|   comment|   putin|    2|              4191|
|   comment|   putin|    3|              3543|
|      post|   putin|    2|              2963|
|      post|   putin|    3|              2954|
|   comment|grudinin|    2|              2012|
|   comment|grudinin|    3|              1484|
|      post|grudinin|    2|              1409|
|   comment|   putin|    1|              1390|
|      post|grudinin|    3|              1206|
|      post|   putin|    1|              1104|
|   comment| navalny|    2|               780|
|   comment| sobchak|    3|               538|
|      post| sobchak|    2|               503|
|   comment| navalny|    1|               474|
|      post| navalny|    2|               461|
|   comment|grudinin|    1|               442|
|   comment| sobchak|    2|               436|
|      post| sobchak|    3|               425|
|   comment| 

### Pivot table

#### Using the `tag_event_month` table, create a pivot table by reducing the dimensions to month and event type.

In [None]:
month_event_type = tag_event_month.groupBy("month").pivot("event_type").sum("count_dist_postids")
month_event_type.printSchema()
month_event_type.show()

root
 |-- month: integer (nullable = true)
 |-- comment: long (nullable = true)
 |-- post: long (nullable = true)
 |-- share: long (nullable = true)

+-----+-------+----+-----+
|month|comment|post|share|
+-----+-------+----+-----+
|    3|   6424|5328|  207|
|    1|   2564|1986|  100|
|    2|   7873|5863|  372|
+-----+-------+----+-----+



###  Tag co-occurrence matrix

#### create a dataframe indicating for each pair of distinct tags the author that uses both of them.

In [None]:
auth_tag = data_with_tags.select(col("event.author.id").alias("id1"),col("tag"))\
.crossJoin(data_with_tags.select(col("event.author.id").alias("id2"),col("tag").alias("otherTag")))\
.filter((col("id1") == col("id2")) & (col("tag") != col("otherTag")))\
.select(col("id1").alias("authorID"),"tag","otherTag")
auth_tag.show()

+----------+-----------+-----------+
|  authorID|        tag|   otherTag|
+----------+-----------+-----------+
|-163732739|    navalny|      putin|
|-163732739|    navalny|    sobchak|
|-163732739|      putin|    navalny|
|-163732739|      putin|    sobchak|
|-163732739|    sobchak|    navalny|
|-163732739|    sobchak|      putin|
|-163685747|   grudinin|      putin|
|-163685747|   grudinin|    sobchak|
|-163685747|      putin|   grudinin|
|-163685747|      putin|    sobchak|
|-163685747|    sobchak|   grudinin|
|-163685747|    sobchak|      putin|
|-163455107|    navalny|      putin|
|-163455107|      putin|    navalny|
|-163409699|  yavlinsky|zhirinovsky|
|-163409699|zhirinovsky|  yavlinsky|
|-163370734|      putin|    sobchak|
|-163370734|    sobchak|      putin|
|-163322498|   grudinin|      putin|
|-163322498|   grudinin|    sobchak|
+----------+-----------+-----------+
only showing top 20 rows



#### Build the tag co-occurence matrix indicating for each pair of tags the number of authors that use them

In [None]:
co_occur_mat = auth_tag.groupBy("tag").pivot("otherTag").agg(count("tag"))
co_occur_mat.show()

+-----------+--------+-------+-----+-------+-----+---------+-----------+
|        tag|grudinin|navalny|putin|sobchak|titov|yavlinsky|zhirinovsky|
+-----------+--------+-------+-----+-------+-----+---------+-----------+
|      titov|     140|     21|  246|    111| null|       94|         90|
|zhirinovsky|     714|     90|  901|    439|   90|      145|       null|
|    navalny|    1113|   null| 1806|    300|   21|       60|         90|
|    sobchak|     722|    300| 1299|   null|  111|      199|        439|
|  yavlinsky|     210|     60|  355|    199|   94|     null|        145|
|      putin|    7267|   1806| null|   1299|  246|      355|        901|
|   grudinin|    null|   1113| 7267|    722|  140|      210|        714|
+-----------+--------+-------+-----+-------+-----+---------+-----------+



## END