# ST446 Distributed Computing for Big Data
## Assignment 2 - PART 1
---

**We highly recommend using GCP, as the data sets used are about 20 GB in total.** Alternatively, you can use your own computer.

## P1: Querying the YAGO semantic knowledge base

YAGO is a semantic knowledge base, derived from Wikipedia, WordNet and GeoNames (in its Version 1). YAGO contains knowledge about more than 10 million entities (like persons, organizations and cities) and contains more than 120 million facts about these entities. You may find more about YAGO [here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/#c10444).

In this assignment, you are asked to use parts of the YAGO dataset to demonstrate your knowledge about Spark GraphFrames and motif queries. In particular, you are asked to **_use motif queries_** to find out answers to the following queries stated in English:

**A (max points 10)**. _Politicians who are also scientists_ (sorted alphabetically by name of person)

**B (max points 10)**. _Companies whose founders were born in London_ (sorted alphabetically by name of founder)

**C (max points 10)**. _Writers who have won a Nobel Prize (in any discipline)_ (sorted alphabetically by name of person)

**D (max points 10)**. _Nobel prize winners who were born in the same city as their spouses_ (sorted alphabetically by name of person)

**E (max points 10)**. _Politicians that are affiliated with a right-wing party_ (sorted alphabetically by name of person)

Please always show the first 20 entries of the resulting DataFrame and the total count of relevant entries.

---

## 0.1 Get YAGO data

* You will need to download the following datasets that are part of YAGO (see [here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/downloads/) for more information):

    * A set of relationships between instances (for example, specifying that Emomali Rahmon is the leader of the Military of Tajikistan). Link: http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoFacts.tsv.7z

    * A set of subclass relationships (for example, specifying that *A1086* is *a road in England*, or that *Salmonella Dub* is *a Reggae music group* and also a *New Zealand dub musical group*). Link: http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoTransitiveType.tsv.7z

* Use `wget` to download the data into the master node of your Dataproc cluster (the files are big!).

* Next, you will need to extract `tsv` files from the `7z` archives that you have downloaded. Use the following commands to install `p7zip` on your Dataproc cluster and extract the files. Please note that this can take a while, in particular as `yagoTransitiveType.tsv` is **18GB** large.

* Put the files (`yagoTransitiveType.tsv` and `yagoFacts.tsv`) into the Hadoop file system. 

```
wget http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoFacts.tsv.7z
wget http://resources.mpi-inf.mpg.de/yago-naga/yago3.1/yagoTransitiveType.tsv.7z
sudo apt-get install p7zip-full
7z x yagoTransitiveType.tsv.7z 
7z x yagoFacts.tsv.7z
hadoop fs -put ./ /yago
```

Also, have a look at their first few lines to understand what kind of data they contain (you need this to infer the schemas).

```
head yagoTransitiveType.tsv
head yagoFacts.tsv
```

## 0.2 Read the data into a Spark DataFrame

Please load the data from `yagoFacts.tsv` into a DataFrame called `df` and `yagoTransitiveType.tsv` into a DataFrame called `df_subclasses`.

Have a look at the beginning of the files to understand the schema. Once imported, both DataFrames should have columns labelled as `id`, `subject`, `predicate`, `object` and `value`.
In the case of `yagoTransitiveType.tsv`, some of the predicates can be understood as *is a subclass of* or *is a member of the class*, and the objects can be understood as classes.

In [1]:
# your code => remember to include the schema
from pyspark.sql.types import *

yagotrans_path = 'hdfs://st446-cluster-m/yago/yagoTransitiveType.tsv'
yagofacts_path = 'hdfs://st446-cluster-m/yago/yagoFacts.tsv'

df_schema = StructType([
    StructField("id", StringType(), True),    
    StructField("subject", StringType(), True),
    StructField("predicate", StringType(), True),
    StructField("object", StringType(), True),
    StructField("value", DoubleType(), True)
])


df_subclasses_schema = StructType([
    StructField("id", StringType(), True),    
    StructField("subject", StringType(), True),
    StructField("predicate", StringType(), True),
    StructField("object", StringType(), True),
    StructField("value", DoubleType(), True)
])

df = spark.read.csv(yagofacts_path, header='false', schema=df_schema, sep='\t')
df.createOrReplaceTempView("df")

df_subclasses = spark.read.csv(yagotrans_path, header='false', schema=df_subclasses_schema, sep='\t')
df_subclasses.createOrReplaceTempView("df_subclasses") 

## 0.3 Understand the database schema

Let's look at the schema:

In [136]:
# your code
df.printSchema()
df_subclasses.printSchema()

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)



The output should be similar to that:

```
YAGO Facts schema:

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)

YAGO TransitiveType schema:

root
 |-- id: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- predicate: string (nullable = true)
 |-- object: string (nullable = true)
 |-- value: double (nullable = true)
 ```

The useful information is in columns `subject`, `predicate` and `object`. **predicate** defines the relation between entities **subject** and **object**. For example, for *Albert Einstein was born in Ulm*, `Albert Einstein` is the subject, `was born in` is the predicate and `Ulm` is the object.

## 0.4 Simple query example

To get information about where Albert Einstein was born, we can load data into Spark using the following queries:

In [137]:
# checking for predicate `was born in`
born_city_df = df.where("predicate == '<wasBornIn>'")
born_city_df.show(1)

+--------------------+--------------------+-----------+---------------+-----+
|                  id|             subject|  predicate|         object|value|
+--------------------+--------------------+-----------+---------------+-----+
|<id_thPX9b1zg!_7f...|<William_Jones_(W...|<wasBornIn>|<Penrhiwceiber>| null|
+--------------------+--------------------+-----------+---------------+-----+
only showing top 1 row



In [138]:
# checking for subject
born_city_df.where("subject = '<Albert_Einstein>'").show()

+--------------------+-----------------+-----------+------+-----+
|                  id|          subject|  predicate|object|value|
+--------------------+-----------------+-----------+------+-----+
|<id_sbCVliqDT2_7f...|<Albert_Einstein>|<wasBornIn>| <Ulm>| null|
+--------------------+-----------------+-----------+------+-----+



You may wonder how one would know whether to use the predicate '&lt;wasBornIn&gt;' or '&lt;was_born_in&gt;' and subject '&lt;Albert_Einstein&gt;' or '&lt;AlbertEinstein&gt;'. 

For YAGO subjects (and objects), the naming is aligned with Wikipedia. For example, Albert Einstein's wiki is: https://en.wikipedia.org/wiki/Albert_Einstein and you can see it is 'Albert_Einstein'. 

For predicates, you can look at the "property" list from the [Yago Web interface](https://yago-knowledge.org/) or the documentation on the TAXONOMY theme ([here](https://www.mpi-inf.mpg.de/departments/databases-and-information-systems/research/yago-naga/yago/downloads)).

Try different queries with this Web interface query to understand more how to query YAGO.

## 0.5 Simple motif example

To find out "Which city was Albert Einstein born in?", we can use the following motif query on the first dataframe (`df`):

In [139]:
from graphframes import *

# helper function to filter nodes (vertices) by subject and object, and establish edges
def vertices_edges_split(df, condition):
    sub = df.filter(condition).select("subject").withColumnRenamed("subject","id")
    obj = df.filter(condition).select("object").withColumnRenamed("object","id")
    v = sub.union(obj).distinct()
    e = df.filter(condition).select("subject","object","predicate")\
    .withColumnRenamed("subject","src").withColumnRenamed("object","dst")
    return v, e

# build a graph filtering by Albert Einstein
v, e = vertices_edges_split(df, "subject='<Albert_Einstein>'")
g = GraphFrame(v, e)
# find all relationships where Albert Einstein is the subject...
motifs = g.find("(a)-[e]->(b)")
# ... and the predicate is `was born in`
res = motifs.filter("e.predicate='<wasBornIn>'")
res.show()

+-------------------+--------------------+-------+
|                  a|                   e|      b|
+-------------------+--------------------+-------+
|[<Albert_Einstein>]|[<Albert_Einstein...|[<Ulm>]|
+-------------------+--------------------+-------+



## 0.6 Some useful tips

### Get a subset of YAGO database
YAGO database is large, so we don't try to load the entire database into a dataframe and then query it. If you do this, you will find that you won't even be able to execute `df.take(1)`, as it would take up too much of space (at least on a laptop). Instead, you use Spark SQL commands or `df.where` to get a suitable fraction of the data.

### Try the queries in the YAGO Web interface first
It is sometimes tricky to get the right "subject", "predicate" and "object". It is easier if you start from [Yago Web interface](https://yago-knowledge.org/) rather than directly querying in PySpark. Once your query works, you can convert your query to PySpark code. 

Note that sometimes the Web version of object/subject code may be different from what you need to type here. For example, company code is &lt;wordnet_company_108058098&gt; when you do the query here but when you do it via the web interface it is &lt;wordnet company 108058098&gt;. 

### Be patient and don't do this exercise in the last minute
Some trial and error is needed to get the query right and it may take some time get the result for a query. For these reasons, we advise you not to wait to work out this exercise just before the submission deadline. 

---

## 1. Politicians who are also scientists (Question A)
Find all politicians who are also scientists. Output top 20 of them. How many people are in the dataset who are both scientists and politicians?

Please follow these steps:
* Operate on the subsets of `df_subclasses` where the objects are `'<wordnet_scientist_110560637>` (scientists) and `'<wordnet_politician_110450303>'` (politicians), and where the predicates are `rdf:type`.
* Use graphframes and the right parts of `df_subclasses` to construct a graph whose (directed) edges point from subjects to objects. Hence, its source vertices are subjects and it destination vertices are objects. It may be convenient to use intermediate DataFrames and join all the required dataframes of edges and vertices.
* The subjects will be people and the objects will be classes (e.g., scientists, politicians).
* Use a motif query to find all instances that fulfil the criteria specified in the question.
* It is a good idea to define a function that takes a DataFrame and outputs a set of data frames for vertices and edges.

Please sort the output alphabetically by the person column.

In [3]:
# your code
#Create a subset of the df_subclasses, where the object is scientist.
df_sub_scientists = spark.sql("select * from df_subclasses where object == '<wordnet_scientist_110560637>'")

#Create a subset of the df_subclasses, where the object is politician.
df_sub_politician = spark.sql("select * from df_subclasses where object == '<wordnet_politician_110450303>'")
df_sub_politician.createOrReplaceTempView("df_sub_politician")

#Union the dataframes of scientists and polticians in one
df_sub_scien_polit = df_sub_scientists.union(df_sub_politician)

In [6]:
# build a graph filtering by predicate being rdf:type
from graphframes import *

# helper function to filter nodes (vertices) by subject and object, and establish edges
def vertices_edges_split(df, condition):
    sub = df.filter(condition).select("subject").withColumnRenamed("subject","id")
    obj = df.filter(condition).select("object").withColumnRenamed("object","id")
    v = sub.union(obj).distinct()
    e = df.filter(condition).select("subject","object","predicate")\
    .withColumnRenamed("subject","src").withColumnRenamed("object","dst")
    return v, e

v, e = vertices_edges_split(df_sub_scien_polit, "predicate='rdf:type'")
g = GraphFrame(v, e)

In [7]:
# find all relationships where a person is both a politican and a scientist
motifs = g.find("(a)-[]->(b); (a)-[]->(c)").filter("b.id = '<wordnet_scientist_110560637>'and \
                                                    c.id = '<wordnet_politician_110450303>'")

In [None]:
#Show the total number of politicians that are also scientists
motifs.count()

7182

The total number of politicians that are also scientists is: 7182

In [None]:
# your code
#sort the output alphabetically by the person column.
motifs.sort("a.id",ascending=True).withColumnRenamed("a","person").limit(20).show(truncate=False)

+---------------------------+-------------------------------+--------------------------------+
|person                     |b                              |c                               |
+---------------------------+-------------------------------+--------------------------------+
|[<A._C._Cuza>]             |[<wordnet_scientist_110560637>]|[<wordnet_politician_110450303>]|
|[<A._P._J._Abdul_Kalam>]   |[<wordnet_scientist_110560637>]|[<wordnet_politician_110450303>]|
|[<Aad_Kosto>]              |[<wordnet_scientist_110560637>]|[<wordnet_politician_110450303>]|
|[<Aad_Nuis>]               |[<wordnet_scientist_110560637>]|[<wordnet_politician_110450303>]|
|[<Aaron_Aaronsohn>]        |[<wordnet_scientist_110560637>]|[<wordnet_politician_110450303>]|
|[<Aaron_Farrugia>]         |[<wordnet_scientist_110560637>]|[<wordnet_politician_110450303>]|
|[<Ab_Klink>]               |[<wordnet_scientist_110560637>]|[<wordnet_politician_110450303>]|
|[<Abba_P._Lerner>]         |[<wordnet_scientist_1

## 2. Companies whose founders were born in London (Question B)
For companies, use `'<wordnet_company_108058098>'`. 
For *"being founder"*, use `predicate=<created>`.

By now, you will understand which DataFrame to use for what. 

Set up a graph and use a motif query to find companies whose founders were born in London.
Please take some time to figure out how a suitable configuration of nodes and edges should look like.  How many such companies are there in our dataset?

Please sort the output alphabetically by the founder column.

In [25]:
# your code

#Subset the df_subclasses to get all the subjects that are companies:
df_sub_company = spark.sql("select subject from df_subclasses where object == '<wordnet_company_108058098>' and predicate='rdf:type'")
df_sub_company.createOrReplaceTempView("df_sub_company")

#Subset the df to get all the subjects that are born in London:
df_London = spark.sql("select * from df where object == '<London>' and predicate =='<wasBornIn>'")
df_London.createOrReplaceTempView("df_London")

#Subset the df to get all the relationships that are <created>:
df_created = spark.sql("select * from df where predicate =='<created>'")
df_created.createOrReplaceTempView("df_created")

In [26]:
#From the df get the records where predicate = <created> and object is a company name linked to the df_sub_company
df_companies = spark.sql("select a.* from df_created as a inner join df_sub_company as b where a.object = b.subject")
df_companies.createOrReplaceTempView("df_companies")

#From df_companies get subjects that were born in London
df_comp_London= spark.sql("select c.subject as founder, \
                            c.object as company, \
                            c.predicate as created, \
                            l.predicate as born \
                            from df_companies c inner join df_London as l \
                            where c.subject = l.subject")
df_comp_London.createOrReplaceTempView("df_comp_London")

In [132]:
#Create the graph
sub2 = df_comp_London.select("founder").withColumnRenamed("founder","id")
obj2 = df_comp_London.select("company").withColumnRenamed("company","id")
v2 = sub2.union(obj2).distinct()

e2 = df_comp_London.select("founder", "company")\
.withColumnRenamed("founder","src").withColumnRenamed("company","dst")
#I have set the source to be the company, so that in the output the company appears first, 
#but based on the yago dataset the founder is conceptually correct to be the source

g2 = GraphFrame(v2, e2)

In [133]:
#Create the motif for companies whose founders were born in London
motifs2 = g2.find("(a)-[]->(b)")

In [36]:
#Get the number of entries in the motif
motifs2.count()

61

In [134]:
#Get the number of companies that are in the motif dataset
motifs2.select('b').distinct().count()

59

There are only 59 different companies, but 61 entries

In [135]:
# your code
#Sort the output alphabetically by the founder column and show the top 20 results
motifs2.sort("a.id", ascending=True).withColumnRenamed("a","founder").withColumnRenamed("b","company").show(20, truncate = False) 

+---------------------------------+---------------------------------------+
|founder                          |company                                |
+---------------------------------+---------------------------------------+
|[<Adam_Hamdy>]                   |[<Dare_Comics>]                        |
|[<Alexander_Asseily>]            |[<Jawbone_(company)>]                  |
|[<Antony_Jay>]                   |[<Video_Arts>]                         |
|[<Aubrey_de_Grey>]               |[<SENS_Research_Foundation>]           |
|[<Ben_Horowitz>]                 |[<Andreessen_Horowitz>]                |
|[<Bernard_MacMahon_(filmmaker)>] |[<LO-MAX_Records>]                     |
|[<Brian_Maxwell>]                |[<PowerBar>]                           |
|[<Bruno_Heller>]                 |[<Primrose_Hill_Productions>]          |
|[<Charlie_Chaplin>]              |[<United_Artists>]                     |
|[<Dan_Joyce>]                    |[<Kurrupt_Recordings_HARD>]            |
|[<Daniel_Ja

## 3. Writers who have won a Nobel Prize in any discipline, including economics (Question C)
Tags for nobel prizes look like these: `'<Nobel_Prize_in_Chemistry>`, `<Nobel_Prize_in_Physics>'`, `<Nobel_Prize>` or `<Nobel_Prize>` etc.
We are also counting this one: `'<Nobel_Memorial_Prize_in_Economic_Sciences>'`.

The tag for writers is `'<wordnet_writer_110794014>'`.

You will need to use `'<hasWonPrize>'` as a predicate.

Please sort the output alphabetically by the person column.

In [58]:
# your code

#Get a subset of the df_subset dataframe, where subjects are writers:
df_sub_writers = spark.sql("select subject from df_subclasses where object == '<wordnet_writer_110794014>' and predicate='rdf:type'")
df_sub_writers.createOrReplaceTempView("df_sub_writers")

#From the df get only the records where the subjects are writers
df_writers = spark.sql("select df.* from df inner join df_sub_writers as w where df.subject = w.subject")
df_writers.createOrReplaceTempView("df_writers")

In [59]:
#Create the graph where vertices are the people (the writers) and their objects (including the prizes)
#Edges are the action that a writer has won a prize
v3, e3 = vertices_edges_split(df_writers, "predicate='<hasWonPrize>'")
g3 = GraphFrame(v3, e3)

In [68]:
#Create the motif
motifs3 = g3.find("(a)-[]->(b)").filter("b.id like '%Nobel_Prize%' or b.id == '<Nobel_Memorial_Prize_in_Economic_Sciences>'")

In [None]:
#sort the output alphabetically by the person column
motifs3.sort("a.id", ascending=True).withColumnRenamed("a","writer").withColumnRenamed("b","Nobel Prize").show(20, truncate = False) 

+--------------------------------+---------------------------------------------+
|writer                          |Nobel Prize                                  |
+--------------------------------+---------------------------------------------+
|[<Adrienne_Clarkson>]           |[<Nobel_Prize_in_Physics>]                   |
|[<Albert_Camus>]                |[<Nobel_Prize_in_Literature>]                |
|[<Albert_Einstein>]             |[<Nobel_Prize_in_Physics>]                   |
|[<Aleksandr_Solzhenitsyn>]      |[<Nobel_Prize_in_Literature>]                |
|[<Alexander_Prokhorov>]         |[<Nobel_Prize_in_Physics>]                   |
|[<Alexei_Alexeyevich_Abrikosov>]|[<Nobel_Prize_in_Physics>]                   |
|[<Alexis_Carrel>]               |[<Nobel_Prize_in_Physiology_or_Medicine>]    |
|[<Alfred_Kastler>]              |[<Nobel_Prize_in_Physics>]                   |
|[<Alice_Munro>]                 |[<Nobel_Prize_in_Literature>]                |
|[<Alvin_E._Roth>]          

In [None]:
#Show the total count of the entries in the motif
motifs3.count()

220

## 4. Nobel prize winners who were born in the same city as their spouses (Question D)
You may find the predicate `'<isMarriedTo>'` useful to create a Dataframe of all mariages.
Please also show the cities in which the Nobel laureates and their spouses were born.

Please sort the output alphabetically by the person (prize winner) column.

In [5]:
# your code
#Get dataframe subsets

#Subset the df dataframe df to get Nobel prize winners:
df_nobel_winners = spark.sql("select df.* from df where predicate='<hasWonPrize>' and \
                            (object like '%Nobel_Prize%' or object = '<Nobel_Memorial_Prize_in_Economic_Sciences>')")
df_nobel_winners.createOrReplaceTempView("df_nobel_winners")

#Subset the df dataframe by joining it with the df_nobel_winners to get all the records of Nobel prize winners:
df_nobel_winners_all = spark.sql("select df.* from df inner join df_nobel_winners as n on df.subject =n.subject")
df_nobel_winners_all.createOrReplaceTempView("df_nobel_winners_all")

#Subset the df dataframe of df_nobel_winners_all so that the predicate is '<isMarriedTo>':
df_spouses = spark.sql("select * from df_nobel_winners_all where predicate='<isMarriedTo>'")
df_spouses.createOrReplaceTempView("df_spouses")

df_cities = spark.sql("select * from df where predicate =='<wasBornIn>'")
df_cities.createOrReplaceTempView("df_cities")

In [7]:
#Create vertices for the writers
nobelVerticesWithType = spark.sql("select subject as id, 'nobelwinner' as vtype from df_nobel_winners ")

#Get both subject and object as the spouce vertex, because both columns have person ids
spouseVerticesWithType = spark.sql("select object as id, 'spouse' as vtype from df_spouses ")
spouseVerticesWithType.createOrReplaceTempView("spouseVerticesWithType")

#Create vertices for the birth cities:
cityVertices = spark.sql("select object as id, 'city' as vtype from df_cities")
cityVertices.createOrReplaceTempView("cityVertices")

#union vertices
AllVertices = nobelVerticesWithType.union(spouseVerticesWithType).union(cityVertices).distinct()

In [8]:
#Edges
#left is the writer, right is her/his spouse
SpouceEdges = df_spouses.select("subject","object","predicate")\
                        .withColumnRenamed("subject","src").withColumnRenamed("object","dst")

#left is the person, right is her/his birth city
CityEdges = df_cities.select("subject","object","predicate")\
                        .withColumnRenamed("subject","src").withColumnRenamed("object","dst")    
    
#union edges
AllEdges = SpouceEdges.union(CityEdges)

In [10]:
#Create the tripartite Graph
g4 = GraphFrame(AllVertices,AllEdges)

In [11]:
#Create the motif

# n: nobel winner vertex
# s: spouse vertex
# c: city vertex
motif4 = g4.find("(n)-[e1]->(s);(s)-[e2]->(c);(n)-[e3]->(c)")\
        .filter("e1.predicate ='<isMarriedTo>'")\
        .filter("e2.predicate ='<wasBornIn>'")\
        .filter("e3.predicate ='<wasBornIn>'")\
        .filter("n.id != s.id")\
        .filter("n.vtype = 'nobelwinner'")\
        .filter("s.vtype = 'spouse'")\
        .filter("c.vtype = 'city'")

In [14]:
#Show the motif that was created above with selected meaningful column names
#The motif shows the top 20 Nobel prize winners with alphabetical order, who were born in the same city as their spouses
motif4.sort("n.id", ascending=True)\
      .withColumnRenamed("n","nobel_winner")\
      .withColumnRenamed("s","spouse")\
    .withColumnRenamed("c","city")\
      .select("nobel_winner", "spouse", "city").show(20, truncate = False) 

+--------------------------------------+---------------------------------+-----------------------+
|nobel_winner                          |spouse                           |city                   |
+--------------------------------------+---------------------------------+-----------------------+
|[<Carl_Ferdinand_Cori>, nobelwinner]  |[<Gerty_Cori>, spouse]           |[<Prague>, city]       |
|[<Frédéric_Joliot-Curie>, nobelwinner]|[<Irène_Joliot-Curie>, spouse]   |[<Paris>, city]        |
|[<Gerty_Cori>, nobelwinner]           |[<Carl_Ferdinand_Cori>, spouse]  |[<Prague>, city]       |
|[<Irène_Joliot-Curie>, nobelwinner]   |[<Frédéric_Joliot-Curie>, spouse]|[<Paris>, city]        |
|[<Robert_Hofstadter>, nobelwinner]    |[<Douglas_Hofstadter>, spouse]   |[<New_York_City>, city]|
+--------------------------------------+---------------------------------+-----------------------+



From the resuls above we understand that for 2 couples both husband and wife received a nobel prize (Carl_Ferdinand_Cori, Gerty_Cori) and (Frédéric_Joliot-Curie, Irène_Joliot-Curie).

In [13]:
#Show the total count of the entries in the motif
motif4.count()

5

## 5. Politicians that are affiliated with a right-wing party (Question E)

We are looking for all connections of the form `polician -> party`, where party is a right-wing party and politicians are defined above. If one politician is associated with several right wing parties, you may count them several times.

Use `'<isAffiliatedTo>'` to find membership in organisations and `'<wikicat_Right-wing_parties>'` for right-wing parties organisations.

There are multiple ways to do this.

Please sort the output alphabetically by the person (politician) column.

In [123]:
# your code
#Create a subset of the df_subclasses, for the right wing parties.
df_sub_parties = spark.sql("select * from df_subclasses where object == '<wikicat_Right-wing_parties>' and predicate='rdf:type'")
df_sub_parties.createOrReplaceTempView("df_sub_parties")

#The df_sub_politician was created on the first question

In [124]:
#From the df get only the records where the subjects are politicians and the objects parties
df_pol_parties = spark.sql("select df1.* from (select df.* from df inner join df_sub_politician as pol on df.subject = pol.subject) df1\
                       inner join df_sub_parties as par on df1.object = par.subject")
df_pol_parties.createOrReplaceTempView("df_pol_parties")

In [126]:
#Create the graph
v5, e5 = vertices_edges_split(df_pol_parties, "predicate='<isAffiliatedTo>'")
g5 = GraphFrame(v5, e5)

In [127]:
#Create the motif
motif5 = g5.find("(a)-[e]->(b)").filter("e.predicate ='<isAffiliatedTo>'")


In [None]:
# Show the top 20 alphabetically politicians that are affiliated with a right-wing party
motif5.sort("a.id", ascending=True)\
      .withColumnRenamed("a","politician")\
      .withColumnRenamed("b","party")\
      .select("politician", "party").show(20, truncate = False) 

+---------------------------------+----------------------------------------+
|politician                       |party                                   |
+---------------------------------+----------------------------------------+
|[<A.N.M._Ehsanul_Hoque_Milan>]   |[<Bangladesh_Nationalist_Party>]        |
|[<A._A._Wijethunga>]             |[<United_National_Party>]               |
|[<A._B._Colton>]                 |[<Republican_Party_(United_States)>]    |
|[<A._C._Clemons>]                |[<Republican_Party_(United_States)>]    |
|[<A._C._Gibbs>]                  |[<Republican_Party_(United_States)>]    |
|[<A._C._Hamlin>]                 |[<Republican_Party_(United_States)>]    |
|[<A._Clifford_Jones>]            |[<Republican_Party_(United_States)>]    |
|[<A._Dean_Jeffs>]                |[<Republican_Party_(United_States)>]    |
|[<A._F._M._Ahsanuddin_Chowdhury>]|[<Jatiya_Party_(Ershad)>]               |
|[<A._G._Crowe>]                  |[<Republican_Party_(United_States)>]    |

In [None]:
#Show the total count of the entries in the motif
motif5.count()

32243