In [2]:
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

findspark.init()

def demarrer_spark():
    local = "local[*]"
    appName = "dataframeJson"
    config = SparkConf().setAppName(appName).setMaster(local).\
    set("spark.executor.memory", "1G").\
    set("spark.driver.memory", "1G").\
    set("spark.sql.catalogImplementation", "in-memory")
    
    spark = SparkSession.builder.config(conf = config).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
    
    spark.conf.set("spark.sql.shuffle.partitions", "4")
    return spark

spark = demarrer_spark()
print("running")

running


hdfs dfs -mkdir -p /user/maxime  
hdfs dfs -copyFromLocal vk_001.json /user/maxime

In [4]:
data = spark.read.format("json").load("vk_001.json").distinct()
data.count()

                                                                                

30683

In [5]:
data.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- code: long (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- attachments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- album: struct (nullable = true)
 |    |    |    |    |-- created: long (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- thumb: struct (nullable = true)
 |    |    |    |    |    |-- access_key: string (nullable = true)
 |    |    |    |    |    |-- album_id: long (nullable = true)
 |    |    |    |    |    |-- date: long (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 | 

The schema indicates each attribute with nullable=true which is not very informative since we can not know whether a field has missing values. The following instructions count the number of rows w/o missing values for some specific fields of interest.

In [6]:
att_list = [
    "event",
    "event.event_id",
    "event.event_id.post_id",
    "event.event_id.post_owner_id",
    "event.event_id.comment_id",
    "event.event_id.shared_post_id",
    "event.author",
    "event.attachments",
    "event.geo",
    "event.tags",
    "event.creation_time"
]

In [24]:
for att in att_list:
    print("count of %s=%s"%(att, data.where(att+ " is not null").count()))



count of event=30683
count of event.event_id=30683
count of event.event_id.post_id=30683
count of event.event_id.post_owner_id=30683
count of event.event_id.comment_id=16518




count of event.event_id.shared_post_id=638
count of event.author=30683
count of event.attachments=15944
count of event.geo=22
count of event.tags=30683
count of event.creation_time=30683


Check that `event.event_id.comment_id` is present only when `event.event_type='comment'`

In [8]:
data.select("*").where((data.event.event_id.comment_id.isNotNull()) & (data.event.event_type != 'comment')).count() == 0

True

Check that `event.event_id.shared_post_id` is present only when `event.event_type='share'`

In [9]:
data.select("*").where((data.event.event_id.shared_post_id.isNotNull()) & (data.event.event_type != 'share')).count() == 0

True

How many distinct post ids are there ?

In [10]:
from pyspark.sql.functions import count, countDistinct

data.select(countDistinct(data.event.event_id.post_id).alias("distinct post ids")).show()

+-----------------+
|distinct post ids|
+-----------------+
|            21683|
+-----------------+





How many distinct post_ids per event type 

In [11]:
data.groupBy(col("event.event_type").alias("event_type")).agg(countDistinct(data.event.event_id.post_id).alias("count_post_id")).show()

+----------+-------------+
|event_type|count_post_id|
+----------+-------------+
|      post|         8137|
|     share|          544|
|   comment|        14202|
+----------+-------------+



Flattening lists of tags

In `data`, each object is associated with an array of tags accessed from `event.tags` (see the schema). Write an instruction to add a `tag` column containing a single tag obtained by flattening the `tags` array

In [12]:
data_with_tags = data.select("_id", "code", "event", explode(data.event.tags).alias("tag"))
data_with_tags.show()



+--------------------+----+--------------------+---------+
|                 _id|code|               event|      tag|
+--------------------+----+--------------------+---------+
|{5a66276e7f254c35...| 100|{new, null, {http...| grudinin|
|{5a6628c37f254c35...| 100|{new, [{null, nul...|    putin|
|{5a66292b7f254c35...| 100|{new, [{null, nul...|    putin|
|{5a66296f7f254c35...| 100|{new, null, {http...|    putin|
|{5a6629907f254c35...| 100|{new, null, {http...|  sobchak|
|{5a662a137f254c35...| 100|{new, [{null, nul...|    putin|
|{5a662b387f254c35...| 100|{new, [{null, nul...| grudinin|
|{5a662dadd81e7c3b...| 100|{new, [{null, nul...|    putin|
|{5a662e03d81e7c3b...| 100|{new, null, {http...|    putin|
|{5a662e5cd81e7c3b...| 100|{new, [{null, nul...|yavlinsky|
|{5a662fe3d81e7c3b...| 100|{new, [{null, nul...| grudinin|
|{5a662ff3d81e7c3b...| 100|{new, null, {http...| grudinin|
|{5a6631c5d81e7c3b...| 100|{new, [{null, nul...|  navalny|
|{5a663292d81e7c3b...| 100|{new, null, {http...| grudini

                                                                                

Return the number of distinct post_id per tag. Sort in descending order of count

In [13]:
data_with_tags.groupBy("tag").agg(countDistinct("event.event_id.post_id").alias("count")).orderBy(desc("count")).show()

+-----------+-----+
|        tag|count|
+-----------+-----+
|      putin|14859|
|   grudinin| 6222|
|    navalny| 2616|
|    sobchak| 2134|
|zhirinovsky| 1231|
|      titov|  577|
|  yavlinsky|  361|
+-----------+-----+



Return the number of distinct author.id per tag. Sort in descending order of count

In [14]:
data_with_tags.groupBy("tag").agg(countDistinct("event.author.id").alias("count")).orderBy(desc("count")).show()

data_author_id_tags = data_with_tags.groupBy(col("tag").alias("name")).agg(countDistinct("event.author.id").alias("count")).orderBy(desc("count"))
data_author_id_tags

+-----------+-----+
|        tag|count|
+-----------+-----+
|      putin|15673|
|   grudinin| 6207|
|    navalny| 2580|
|    sobchak| 2288|
|zhirinovsky| 1214|
|      titov|  572|
|  yavlinsky|  347|
+-----------+-----+



DataFrame[name: string, count: bigint]

Fact checking using Wikipedia

Observe that each tag corresponds to a candidate of the RU2018 Elections (Putin, Titov, etc). We would like to check the relationship between the count of tags per author and the number of votes associated to each candidate. We collet, from Wikipedia, the number of votes per candidates and make it available through the dataset `votes` defined as follows.

In [15]:
from pyspark.sql.types import *

schema_votes = StructType([StructField("name", StringType()),\
                           StructField("party", StringType()),\
                           StructField("votes", LongType())])

raw_votes = [("putin", "Independent", 56430712),\
             ("grudinin", "Communist", 8659206),\
             ("zhirinovsky", "Liberal Democratic Party", 4154985),\
             ("sobchak", "Civic Initiative", 1238031),\
             ("yavlinksky", "Yabloko", 769644),\
             ("titov", "Party of Growth", 556801)\
            ]

votes = spark.createDataFrame(raw_votes, schema_votes)
votes.show(truncate=False)

[Stage 114:>                                                        (0 + 1) / 1]                                                                                

+-----------+------------------------+--------+
|name       |party                   |votes   |
+-----------+------------------------+--------+
|putin      |Independent             |56430712|
|grudinin   |Communist               |8659206 |
|zhirinovsky|Liberal Democratic Party|4154985 |
|sobchak    |Civic Initiative        |1238031 |
|yavlinksky |Yabloko                 |769644  |
|titov      |Party of Growth         |556801  |
+-----------+------------------------+--------+



Return for each candidate the number of its votes and the number of authors who tagged it.

In [16]:
votes_count = votes.join(data_author_id_tags, "name").orderBy(desc("votes"))
votes_count.show()

+-----------+--------------------+--------+-----+
|       name|               party|   votes|count|
+-----------+--------------------+--------+-----+
|      putin|         Independent|56430712|15673|
|   grudinin|           Communist| 8659206| 6207|
|zhirinovsky|Liberal Democrati...| 4154985| 1214|
|    sobchak|    Civic Initiative| 1238031| 2288|
|      titov|     Party of Growth|  556801|  572|
+-----------+--------------------+--------+-----+



Multidimensional aggregation

We want to create a cude with three dimensions: `tag`, `event_type` and `month`. While the first two attributes are already available, the month column must be extracted from the `creation_time` attribute using a built-in function.

Create a dataset `data_tag_mon` obtained by augmenting `data_with_tags` with an attribute`month` containing the month extracted from the `event.creation_time` attribute.

In [17]:
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import month

data_tag_mon = data.select(month(from_unixtime("event.creation_time")).alias("month"))
data_tag_mon.select("month").distinct().show()

+-----+
|month|
+-----+
|    1|
|    2|
|    3|
+-----+



[Stage 129:====>                                                  (1 + 11) / 12]                                                                                

For each combination of event_type, tag and month, count the number of distinct post_ids

In [18]:
tag_event_month = data.select("event.event_type",\
                              explode("event.tags").alias("tag"),\
                              month(from_unixtime("event.creation_time")).alias("month"),\
                              "event.event_id.post_id")\
                      .groupBy("event_type", "tag", "month").agg(countDistinct("post_id").alias("count_dist_postids"))

tag_event_month.orderBy(desc("count_dist_postids")).show()



+----------+--------+-----+------------------+
|event_type|     tag|month|count_dist_postids|
+----------+--------+-----+------------------+
|   comment|   putin|    2|              4194|
|   comment|   putin|    3|              3543|
|      post|   putin|    2|              2965|
|      post|   putin|    3|              2954|
|   comment|grudinin|    2|              2013|
|   comment|grudinin|    3|              1484|
|      post|grudinin|    2|              1409|
|   comment|   putin|    1|              1387|
|      post|grudinin|    3|              1206|
|      post|   putin|    1|              1101|
|   comment| navalny|    2|               780|
|   comment| sobchak|    3|               538|
|      post| sobchak|    2|               502|
|   comment| navalny|    1|               474|
|      post| navalny|    2|               461|
|   comment|grudinin|    1|               441|
|   comment| sobchak|    2|               437|
|      post| sobchak|    3|               426|
|   comment| 

                                                                                

Using the tag_event_month table, create a pivot table by reducing the dimensions to month and event type.

In [21]:
month_event_type = tag_event_month.groupBy("month")\
                                  .pivot("event_type")\
                                  .sum()\
                                  .select("month",\
                                          col("comment_sum(count_dist_postids)").alias("comment"),\
                                          col("post_sum(count_dist_postids)").alias("post"),\
                                          col("share_sum(count_dist_postids)").alias("share"))

month_event_type.printSchema()
month_event_type.show()

                                                                                

root
 |-- month: integer (nullable = true)
 |-- comment: long (nullable = true)
 |-- post: long (nullable = true)
 |-- share: long (nullable = true)

+-----+-------+----+-----+
|month|comment|post|share|
+-----+-------+----+-----+
|    2|   7878|5863|  373|
|    1|   2559|1983|   99|
|    3|   6424|5330|  207|
+-----+-------+----+-----+



create a dataframe indicating for each pair of distinct tags the author that uses both of them

In [22]:
data_with_tags = data.select(col("event.author.id").alias("authorID"), explode("event.tags").alias("otherTag")).crossJoin(data.select(col("event.author.id").alias("authorID2"), explode("event.tags").alias("tag")))

tag_co_occur = data_with_tags.where((col("authorID") == col("authorID2")) & (col("tag") != col("otherTag"))).select("authorID", "otherTag", "tag")
tag_co_occur.show()

+----------+-----------+-----------+
|  authorID|   otherTag|        tag|
+----------+-----------+-----------+
|-163732739|    navalny|      putin|
|-163732739|    navalny|    sobchak|
|-163732739|      putin|    navalny|
|-163732739|      putin|    sobchak|
|-163732739|    sobchak|    navalny|
|-163732739|    sobchak|      putin|
|-163685747|   grudinin|      putin|
|-163685747|   grudinin|    sobchak|
|-163685747|      putin|   grudinin|
|-163685747|      putin|    sobchak|
|-163685747|    sobchak|   grudinin|
|-163685747|    sobchak|      putin|
|-163455107|    navalny|      putin|
|-163455107|      putin|    navalny|
|-163409699|  yavlinsky|zhirinovsky|
|-163409699|zhirinovsky|  yavlinsky|
|-163370734|      putin|    sobchak|
|-163370734|    sobchak|      putin|
|-163322498|   grudinin|      putin|
|-163322498|   grudinin|    sobchak|
+----------+-----------+-----------+
only showing top 20 rows



Build the tag co-occurence matrix indicating for each pair of tags the number of authors that use them

In [23]:
co_occur_mat = tag_co_occur.groupBy("tag").pivot("otherTag").count()
co_occur_mat.show()

                                                                                

+-----------+--------+-------+-----+-------+-----+---------+-----------+
|        tag|grudinin|navalny|putin|sobchak|titov|yavlinsky|zhirinovsky|
+-----------+--------+-------+-----+-------+-----+---------+-----------+
|  yavlinsky|     210|     60|  355|    199|   94|     null|        145|
|      titov|     140|     21|  246|    111| null|       94|         90|
|zhirinovsky|     714|     90|  901|    439|   90|      145|       null|
|    navalny|    1113|   null| 1806|    300|   21|       60|         90|
|      putin|    7267|   1806| null|   1299|  246|      355|        901|
|   grudinin|    null|   1113| 7267|    722|  140|      210|        714|
|    sobchak|     722|    300| 1299|   null|  111|      199|        439|
+-----------+--------+-------+-----+-------+-----+---------+-----------+

