# Data import from the dataset

In order to import the data, we have created a dataset distributed in different *.csv* files.

<ul>
<li style="font-weight: 300; font-style:italic">author.csv</li>
<li style="font-weight: 300; font-style:italic">publication.csv</li>
<li style="font-weight: 300; font-style:italic">reference.csv</li>
<li style="font-weight: 300; font-style:italic">venue.csv</li>
<li style="font-weight: 300; font-style:italic">writes.csv</li>
</ul>

Where *reference.csv* contains two columns of *publications_id*, each row represents the references between two publications.

Similarly *writes.csv* contains the column of an *author_id* and a *publication_id*, each row represents the publication related to an author.

The remaining uncommented *.csv* files are self-explainatory.

In [None]:
# Import the basic spark library
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.sql.functions import from_json, col, count, when, array_contains, lower

# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("NoSQLProject") \
      .getOrCreate()
# master contains the URL of your remote spark instance bor 'local'

In [None]:
schemapub = StructType([ \
    StructField('id', StringType(), False), \
    StructField('title', StringType(), False), \
    StructField('page_start', IntegerType(), True), \
    StructField('page_end', IntegerType(), True), \
    StructField('year', IntegerType(), True), \
    StructField('citations', IntegerType(), True), \
    StructField('venue', StringType(), True), \
    StructField('keywords', StringType(), True) \
])

schemaref = StructType([ \
    StructField('references', StringType(), False), \
    StructField('referenced', StringType(), False) \
])

schemavenue = StructType([ \
    StructField('name', StringType(), False), \
    StructField('type', StringType(), True) \
])

schemawrites = StructType([ \
    StructField('author', StringType(), False), \
    StructField('publication', StringType(), False) \
])

schemaauthor = StructType([ \
    StructField('id', StringType(), False), \
    StructField('name', StringType(), False), \
    StructField('org', StringType(), True) \
])

In [None]:
# import data from csv files -> storing it into dataframes with static schemas

pub = spark.read.option("header", True).option("delimiter", ";").schema(schemapub).csv("dataset/publication.csv")
pub = pub.withColumn('keywords', from_json('keywords', ArrayType(StringType())))


ref = spark.read.option("header", True).option("delimiter", ";").schema(schemaref).csv("dataset/reference.csv")


ven = spark.read.option("header", True).option("delimiter", ";").schema(schemavenue).csv("dataset/venue.csv")


writes = spark.read.option("header", True).option("delimiter", ";").schema(schemawrites).csv("dataset/writes.csv")


author = spark.read.option("header", True).option("delimiter", ";").schema(schemaauthor).csv("dataset/author.csv")

#### Publications

In [None]:
pub.printSchema()
pub.show(5)

#### References


In [None]:
ref.printSchema()
ref.show(5)

#### Venues

In [None]:
ven.printSchema()
ven.show(5)

#### Authors write a publication

In [None]:
writes.printSchema()
writes.show(5)

#### Authors

In [None]:
author.printSchema()
author.show(5)

# Read Queries

Since **pyspark** uses functions "SQL-equivalent" we have decided (for most of the queries) to take the SQL query as a reference before the implementation in pyspark.


#### 1. Show the type of publications from 1990 to date. (WHERE and JOIN)

```sql
SELECT type  
FROM publication JOIN (SELECT * FROM venue)
WHERE publication.year > 1990
```

In [None]:
pub.join(ven, pub.venue == ven.name, 'inner') \
   .filter(pub.year > 1990) \
   .select('year', 'type') \
   .show(truncate = False)

#### 2. Find the names of publications published in conferences(WHERE and JOIN again)

```sql
SELECT title, name  
FROM publication AS p JOIN venue AS v 
ON p.venue = v.name WHERE v.type = 'C'

In [None]:
pub.join(ven, pub.venue == ven.name, 'inner') \
   .select('title', 'name') \
   .filter(col('type') == 'C') \
   .show(truncate = True)

#### 3. List books where their title ends with "i" as the penultimate letter (WHERE, LIKE and LIMIT)

```sql
SELECT p.title
FROM publication AS p
WHERE title LIKE '%i_'
LIMIT 5
```

In [None]:
pub.filter(col('title').like('%i_')) \
   .select(col('title')) \
   .limit(5) \
   .show(truncate = False)

#### 4. Find publications written by authors affiliated with a specific organization (WHERE, IN, Nested Query)

```sql
SELECT p.title AS Title
FROM publication AS p
WHERE p.id IN ( SELECT p2.id
                FROM writes AS w JOIN author AS a ON w.author = a.id
                WHERE a.org = 'University of Illinois at Chicago.'
              )
```              

In [None]:
org_pubs = pub.join(writes, writes.publication == pub.id) \
                .join(author, author.id == writes.author) \
                .filter(col('org') == 'University of Illinois at Chicago.') \
                .rdd.map(lambda x: x.publication).collect()

pub.filter(col('id').isin(org_pubs)).show(truncate = True)

#### 5. Show authors that have written at least 2 publications, sorted by number of publications (GROUP BY, JOIN, AS)

```sql
SELECT w.author AS Author, COUNT(*) AS NumberOfPublications
FROM writes AS w JOIN author AS a ON w.author = a.id
GROUP BY a.id, w.author
HAVING COUNT(*) > 1

In [None]:
writes.join(author, writes.author == author.id,'inner') \
      .groupBy('author') \
      .agg(count('publication').alias('number_of_publications')) \
      .filter(col('number_of_publications') > 1) \
      .sort('number_of_publications',ascending=True) \
      .show(truncate = False)

#### 6. Show publications (title and number of citations) whose title contains the word 'methodology' (GROUP BY and WHERE)

```sql
SELECT p.title, COUNT(*) AS citations
FROM publication AS p JOIN ref AS r ON p.id = r.referenced
WHERE p.title LIKE '%methodology%'
GROUP BY r.referenced, p.title
```

In [None]:
pub.join(ref, ref.referenced == pub.id) \
   .filter(lower(col('title')).like('%methodology%')) \
   .groupBy('referenced', 'title') \
   .count() \
   .select(col('title').alias('Title'), col('count').alias('Citations')) \
   .show(truncate = True)

#### 7. Show the IDs of publications that have cited at least 25 other publications (GROUP BY, HAVING and AS)

```sql
SELECT r.references, COUNT(*) AS references
FROM ref AS r 
GROUP BY r.references
HAVING COUNT(*) >= 25
````


In [None]:
ref.groupBy('references') \
   .count() \
   .filter(col('count') >= 25) \
   .withColumnRenamed('references', 'PublicationID') \
   .withColumnRenamed('count', 'References') \
   .show()

#### 8. Show publications with at least 45 citations that contain the keyword 'Data mining' (GROUP BY, HAVING, WHERE and AS)

```sql
SELECT p.title AS Title, p.keywords AS Keywords, COUNT(*) AS citations
FROM publication AS p JOIN ref AS r ON p.id = r.referenced
WHERE array_contains(p.keywords, 'Data mining')
GROUP BY p.title, p.keywords, r.referenced
HAVING COUNT(*) >= 45
```

In [None]:
pub.join(ref, ref.referenced == pub.id) \
   .filter(array_contains(pub.keywords, 'Data mining')) \
   .groupBy('referenced', 'title', 'keywords') \
   .count() \
   .filter(col('count') >= 45) \
   .select(col('title').alias('Title'), col('keywords').alias('Keywords'), col('count').alias('Citations')) \
   .show(truncate = True)

#### 9. Find publications that are referenced more than the average that contain the keywords 'Data Mining' and 'Computer Science', ordered by the number of publications in ascending order. (WHERE, Nested query, GROUP BY)

```sql
SELECT p.title, COUNT(*) AS citations
FROM publication AS p JOIN ref AS r ON p.id = r.referenced
WHERE array_contains(p.keywords, ['Data mining', 'Computer science'])
GROUP BY p.title, r.referenced
HAVING COUNT(*) > ( SELECT AVG(counter)
                    FROM (SELECT COUNT(*) AS counter
                          FROM ref AS r2
                          GROUP BY r2.referenced
                         )
                  )

```

In [None]:
average = ref.groupBy('referenced').count() \
             .sort('count',ascending=False) \
             .groupBy() \
             .avg('count') \
             .collect()[0][0]

pub.join(ref,ref.referenced == pub.id, 'left') \
   .select('title', 'id', 'referenced', 'references', 'keywords') \
   .groupBy('title', 'keywords') \
   .agg(count('references').alias('number_of_references')) \
   .filter((col('number_of_references') > average) & \
            array_contains(pub.keywords, 'Computer science') & \
            array_contains(pub.keywords, 'Data mining')) \
   .sort('number_of_references',ascending = True) \
   .show()

#### 10. Find the publications with the average number of citations and the types of conferences they are in (WHERE, GROUP BY, HAVING, 1 JOINs)

```sql
SELECT p.title AS Title, p.id AS ID, p.venue AS Venue, v.type AS Type
FROM publication AS p JOIN venue AS v ON p.venue = v.name
WHERE p.id IN ( SELECT p2.id 
                FROM publication AS p2 JOIN ref AS r ON p2.id = r.referenced
                GROUP BY p2.id
                HAVING COUNT(*) = ( SELECT AVG(citations)
                                    FROM ( SELECT COUNT(*) AS citations
                                           FROM ref AS r 
                                           GROUP BY r.referenced
                                         )
                                   )
              )
````


In [None]:
avg = pub.join(ref, ref.referenced == pub.id) \
         .groupBy('id') \
         .count() \
         .groupBy() \
         .avg('count') \
         .collect()[0][0]

avg = int(avg)

avg_pubs = pub.join(ref, ref.referenced == pub.id) \
              .groupBy('id') \
              .count() \
              .filter(col('count') == avg) \
              .rdd.map(lambda x: x.id).collect()

pub.join(ven, ven.name == pub.venue) \
   .filter(col('id').isin(avg_pubs)) \
   .select(col('id'), col('title'), col('venue'), col('type')) \
   .withColumnRenamed('id', 'ID') \
   .withColumnRenamed('title', 'Title') \
   .withColumnRenamed('venue', 'Venue') \
   .withColumnRenamed('type', 'VType') \
   .show(truncate = True)

#### 11. Find the most cited publication(s) in the publications table, list its (their) name(s), ID(s), venue(s) with type and authors (one row per author and publication). (WHERE, GROUP BY, HAVING, 2 JOINs)

```sql
SELECT p.title AS Title, p.id AS ID, p.venue AS Venue, v.type AS Type, author.name AS author
FROM publication AS p JOIN venue AS v ON p.venue = v.name
        JOIN writes AS w ON w.publication = p.id
        JOIN author AS a ON a.id = w.author
WHERE p.id IN ( SELECT p2.id 
                FROM publication AS p2 JOIN ref AS r ON p2.id = r.referenced
                GROUP BY p2.id
                HAVING COUNT(*) = ( SELECT MAX(citations)
                                    FROM ( SELECT COUNT(*)
                                           FROM ref AS r
                                           GROUP BY r.referenced
                                        )
                                   
                                   )
               )
```

In [None]:
max = pub.join(ref, ref.referenced == pub.id) \
         .groupBy('id') \
         .count() \
         .groupBy() \
         .max('count') \
         .collect()[0][0]

max_pubs = pub.join(ref, ref.referenced == pub.id) \
              .groupBy('id') \
              .count() \
              .filter(col('count') == max) \
              .rdd.map(lambda x: x.id).collect()

pub.join(ven, ven.name == pub.venue) \
   .filter(col('id').isin(max_pubs)) \
   .join(writes, writes.publication == pub.id) \
   .join(author, writes.author == author.id) \
   .drop(author.id) \
   .drop(ven.name) \
   .select(col('id'), col('title'), col('venue'), col('type'), col('name')) \
   .withColumnRenamed('id', 'ID') \
   .withColumnRenamed('title', 'Title') \
   .withColumnRenamed('venue', 'Venue') \
   .withColumnRenamed('type', 'VType') \
   .withColumnRenamed('name', 'Author') \
   .show(truncate = True)

# Creation/Update/Deletion Queries

#### 1. Update the publication attribute with the number of pages column for each article.

In [None]:
pub = pub.withColumn("pages"
                     , when(col("page_start") \
                            .isNotNull() \
                            , pub['page_end'] - pub['page_start']) \
                     .otherwise(None) \
                     .alias("pages")) \

pub.select('title','page_start','page_end','pages').show()

#### 2. Delete every publication that has 'keywords' = <i>null</i> or has got no citation.

In [None]:
pub = pub.filter(col('keywords').isNotNull()) \
         .filter(col('citations') > 0)

#### 3. Add the total of the citations for each author in "<i>author</i>" table.

In [None]:
author = pub.join(writes, writes.publication == pub.id) \
            .join(author, writes.author == author.id) \
            .drop(pub.id) \
            .select(col('id'),col('citations')) \
            .groupBy('id') \
            .sum('citations') \
            .withColumnRenamed('sum(citations)','total_citations') \
            .join(author,author.id == author.id) \
            .drop(author.id) \
            .select('id','name','org','total_citations') \
# writes.withColumn()
author.show()



#### 4. Change every instance of <i>Ecole Polytecnique</i> in <i>Ecole Polytecnique Federale de Lausanne.</i>

In [None]:
author.filter(col('org') == 'Ecole Polytechnique').show(truncate=False)
author = author.withColumn('org' \
                           , when(col('org') == 'Ecole Polytechnique' \
                                  , 'Ecole Polytecnique Federale de Lausanne') \
                           .otherwise(col('org'))) 
author.filter(col('org') == 'Ecole Polytecnique Federale de Lausanne').show(truncate=False)

#### 5. Create a table with only the main keyword for each publication (considering the first keyword the main one).

In [None]:
from pyspark.sql.functions import explode,first

p_exp = pub.select('id','title','page_start' \
                          ,'page_end','pages','year' \
                          ,'citations','venue' \
                          , explode(pub.keywords).alias('keyword'))


p_main_k = p_exp.groupBy('id','title','page_start' \
                         ,'page_end', 'pages' \
                         ,'year','citations','venue') \
                .agg(first('id').alias('_i') \
                     , first('title').alias('_t') \
                     , first('page_start').alias('_ps') \
                     , first('page_end').alias('_pe') \
                     , first('pages').alias('_p') \
                     , first('year').alias('_y') \
                     , first('citations').alias('_c') \
                     , first('venue').alias('_v') \
                     , first('keyword').alias('keyword')) \
                .drop('id','title','page_start','page_end' \
                      , 'pages','year','citations','venue') \
                .withColumnRenamed('_i','id') \
                .withColumnRenamed('_t','title') \
                .withColumnRenamed('_ps','page_start') \
                .withColumnRenamed('_pe','page_end') \
                .withColumnRenamed('_p','pages') \
                .withColumnRenamed('_y','year') \
                .withColumnRenamed('_c','citations') \
                .withColumnRenamed('_v','venue') \
                .sort(col('keyword').asc())

        
p_main_k.select('title','keyword').show(1,truncate=False)