# Data Engineering Project 
## ETL

**Authors**: 
- Dmitri Rozgonjuk
- Eerik Sven Puudist
- Lisanne Siniväli
- Cheng-Han Chung


The aim of this script is to clean the main raw data frame and write a new, clean data frame for further use. In this notebook, the comparisons of different read- and write-methods are demonstrated.

First, we install and import the necessary libraries from one cell (to avoid having libraries in some individual cells below). The packages and their versions to be installed will later be added to the `requirements.txt` file.

We also use this section to set global environment parameters.

In [None]:
#!pip install -r requirements.txt

In [1]:
################### Imports ####################
### Data wrangling
import pandas as pd # working with dataframes

### Database drivers
import psycopg2
from neo4j import GraphDatabase

# Custom scripts 
## Neo4J
import sys
sys.path.append('./dags')
from scripts.raw_to_tables import *
from scripts.augmentations import *
from scripts.final_tables import *
from scripts.sql_queries import *
from scripts.neo4j_queries import *

# MISC
import warnings
from tqdm import tqdm

########## SETTING ENV PARAMETERS ################
warnings.filterwarnings('ignore') # suppress warnings

# 5. Example Queries

## 5.1. Data Warehouse

In [None]:
def pandas_to_dwh():
    """Task that imports .csv-s to pandas, makes the Postgres-connection,
    creates a database, drops existing and creates new tables, and inserts
    the data from pandas.
    """
    # Import the data
    try:
        article = pd.read_csv('dags/data_ready/article.csv', error_bad_lines=False)
        author = pd.read_csv('dags/data_ready/author.csv', error_bad_lines=False)
        authorship = pd.read_csv('dags/data_ready/authorship.csv', error_bad_lines=False)
        category = pd.read_csv('dags/data_ready/category.csv', error_bad_lines=False)
        article_category = pd.read_csv('dags/data_ready/article_category.csv', error_bad_lines=False)
        journal = pd.read_csv('dags/data_ready/journal.csv', error_bad_lines=False)
        tables = [article, author, authorship, category, article_category, journal]

        # Name of tables (for later print)
        article.name = 'article'
        author.name = 'author'
        authorship.name = 'authorship'
        category.name = 'category'
        article_category.name = 'article_category'
        journal.name = 'journal'
        print(article.head(2))
        print(author.head(2))
        print(authorship.head(2))
        print(category.head(2))
        print(article_category.head(2))
        print(journal.head(2))
        print('All tables staged for DWH.')
    except:
        print('Error with importing the data tables')
        sys.exit(1)
       
    # Connect to the database
    try: 
        print('Connecting to Postgres...')
        conn = psycopg2.connect(host="postgres", user="airflow", password="airflow", database ="airflow", port = 5432)
        conn.set_session(autocommit=True)
        cur = conn.cursor()
    except:
        print('Postgres connection not established')
        sys.exit(1)

    try:
        # Insert into tables
        for i in tqdm(range(len(tables))):
            insert_to_tables(cur, tables[i], insert_tables[i])
    except:
        print('Error in inserting the data.')
        print('Error in inserting the data.')

In [None]:
conn = psycopg2.connect(host="postgres", user="airflow", password="airflow", database ="airflow", port = 5432)
conn.set_session(autocommit=True)
cur = conn.cursor()

In [2]:
article = pd.read_csv('dags/data_ready/article.csv', error_bad_lines=False)
author = pd.read_csv('dags/data_ready/author.csv', error_bad_lines=False)
authorship = pd.read_csv('dags/data_ready/authorship.csv', error_bad_lines=False)
category = pd.read_csv('dags/data_ready/category.csv', error_bad_lines=False)
article_category = pd.read_csv('dags/data_ready/article_category.csv', error_bad_lines=False)
journal = pd.read_csv('dags/data_ready/journal.csv', error_bad_lines=False)

In [3]:
article = article[~article['article_id'].isnull()]

In [4]:
authorship = authorship[authorship['article_id'].isin(article['article_id'])].reset_index(drop = True)
author = author[author['author_id'].isin(authorship['author_id'])].reset_index(drop = True)
journal = journal[journal['journal_issn'].isin(article['journal_issn'])].reset_index(drop = True)
article_category = article_category[article_category['article_id'].isin(article['article_id'])].reset_index(drop = True)
category = category[category['category_id'].isin(article_category['category_id'])].reset_index(drop = True)

In [5]:
article.to_csv('dags/data_ready/article.csv', index = False)
author.to_csv('dags/data_ready/author.csv', index = False)
journal.to_csv('dags/data_ready/journal.csv', index = False)
article_category.to_csv('dags/data_ready/article_category.csv', index = False)
category.to_csv('dags/data_ready/category.csv', index = False)

In [None]:
tables = [article, author, authorship, category, article_category, journal]

        # Name of tables (for later print)
article.name = 'article'
author.name = 'author'
authorship.name = 'authorship'
category.name = 'category'
article_category.name = 'article_category'
journal.name = 'journal'

In [None]:
 # Drop Tables 
for query in drop_tables:
    cur.execute(query)
    conn.commit()

In [None]:
for query in create_tables:
    cur.execute(query)
    conn.commit()

In [None]:
def insert_to_tables(cur, table, query):
    ''' Helper function for inserting values to Postresql tables
    Args:
        table (pd.DataFrame): pandas table
        query (SQL query): correspondive SQL query for 'table' for data insertion in DB
    '''
    print(f'Inserting table -- {table.name} -- ...')
    
    try:
        for i, row in table.iterrows():
            cur.execute(query, list(row))
        print(f'Table -- {table.name} -- successfully inserted!')
    except:
        print(f'Error with table -- {table.name} --')
    print()


In [None]:
from scripts.sql_queries import *
for i in tqdm(range(len(tables))):
    insert_to_tables(cur, tables[i], insert_tables[i])

In [15]:
%load_ext sql
%sql postgresql://airflow:airflow@postgres/airflow

In [16]:
%sql SELECT COUNT(*) FROM article;

 * postgresql://airflow:***@postgres/airflow
1 rows affected.


count
15240


In [None]:
%sql SELECT COUNT(*) FROM author;

In [None]:
%sql SELECT * FROM author LIMIT 3;

In [None]:
%sql SELECT * FROM article LIMIT 3;

In [None]:
%sql SELECT * FROM article_category LIMIT 3;

In [None]:
%sql SELECT * FROM category LIMIT 3;

In [None]:
%sql SELECT * FROM journal LIMIT 3;

### Who are the top 0.01% scientists with the most publications in the sample?
Outcome: list of 0.01% top scientists, count of publications, ranking in terms of the total sample.

In [29]:
%%sql query_one <<
SELECT author_id, rank_total_pubs as rank, total_pubs as publications
FROM author 
ORDER BY rank_total_pubs 
LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100;

 * postgresql://airflow:***@postgres/airflow
3 rows affected.
Returning data to local variable query_one


In [30]:
query_one

author_id,rank,publications
WangY,1,163
WangX,2,158
ZhangJ,3,154


### Proportionally, in which journals have the top 0.01% of scientists (in terms of publication count) published their work the most?

In [23]:
%%sql query_two <<
SELECT final.author_id, final.rank, final.publications, final.journal_title as top_journal,  TO_CHAR((final.number * 100 / final.publications), 'fm99%') as percentage_of_all_publications
FROM (select a.author_id, rank, publications, mode() within group (order by j.journal_title) AS journal_title, COUNT(j.journal_title) as number
      from (SELECT author_id, rank_total_pubs as rank, total_pubs as publications
      FROM author 
      ORDER BY rank_total_pubs 
      LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100) AS a
      INNER JOIN authorship au ON a.author_id = au.author_id
      INNER JOIN article ar ON au.article_id = ar.article_id
      INNER JOIN journal j ON ar.journal_issn = j.journal_issn
      group by a.author_id, rank, publications,j.journal_title
      having j.journal_title = mode() within group (order by j.journal_title)) as final
LEFT JOIN (select a.author_id, rank, publications, mode() within group (order by j.journal_title) AS journal_title, COUNT(j.journal_title) as number
      from (SELECT author_id, rank_total_pubs as rank, total_pubs as publications
      FROM author 
      ORDER BY rank_total_pubs 
      LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100) AS a
      INNER JOIN authorship au ON a.author_id = au.author_id
      INNER JOIN article ar ON au.article_id = ar.article_id
      INNER JOIN journal j ON ar.journal_issn = j.journal_issn
      group by a.author_id, rank, publications,j.journal_title
      having j.journal_title = mode() within group (order by j.journal_title)) as final1 ON 
    final.author_id = final1.author_id AND final.number < final1.number
WHERE final1.author_id IS NULL
ORDER BY final.rank 
LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100;

 * postgresql://airflow:***@postgres/airflow
3 rows affected.
Returning data to local variable query_two


In [24]:
query_two

author_id,rank,publications,top_journal,percentage_of_all_publications
WangY,1,163,IEEE Transactions on Image Processing,3%
WangY,1,163,Journal of Computational Physics,3%
WangX,2,158,IEEE Signal Processing Letters,1%


### What was the most productive year (N publications) for top 0.01% scientists?

In [33]:
%%sql query_three <<

SELECT final.author_id, final.rank, final.year AS most_influential_year, final.pub AS count_of_pub, final.avg_cites
FROM (SELECT a.author_id, rank, count(ar.year) as pub, ar.year, (sum(ar.n_cites::DECIMAL)::int) / count(ar.year) as avg_cites
    FROM (SELECT author_id, rank_total_pubs as rank
    FROM author
    ORDER BY rank_total_pubs 
    LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100) AS a
    INNER JOIN authorship au ON a.author_id = au.author_id
    INNER JOIN article ar ON au.article_id = ar.article_id
    GROUP BY a.author_id, rank, ar.year) as final
LEFT JOIN (SELECT a.author_id, rank, count(ar.year) as pub, ar.year, (sum(ar.n_cites::DECIMAL)::int) / count(ar.year) as avg_cites
    FROM (SELECT author_id, rank_total_pubs as rank
    FROM author 
    ORDER BY rank_total_pubs 
    LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100) AS a
    INNER JOIN authorship au ON a.author_id = au.author_id
    INNER JOIN article ar ON au.article_id = ar.article_id
    GROUP BY a.author_id, rank, ar.year) as final1 ON 
    final.author_id = final1.author_id AND final.avg_cites < final1.avg_cites
WHERE final1.author_id IS NULL
ORDER BY final.rank 
LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100;

 * postgresql://airflow:***@postgres/airflow
3 rows affected.
Returning data to local variable query_three


In [34]:
query_three

author_id,rank,most_influential_year,count_of_pub,avg_cites
WangY,1,2018,7,250
WangX,2,2017,4,85
ZhangJ,3,2016,7,229


### What was the most influential (in terms of N citations/ N publications) year for top 3% scientists?
Outcome: list of (a) top 3% scientists, most influential year, count of publications for that year, average N of citations per publication.

In [27]:
%%sql query_four <<

SELECT final.author_id, final.rank, final.publications, final.most_productive_year, final.count_of_pub
FROM (SELECT a.author_id, rank, publications, ar.year AS most_productive_year, count(ar.year) as count_of_pub
    FROM (SELECT author_id, rank_total_pubs as rank, total_pubs as publications
    FROM author 
    ORDER BY rank_total_pubs 
    LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100) AS a
    INNER JOIN authorship au ON a.author_id = au.author_id
    INNER JOIN article ar ON au.article_id = ar.article_id
    GROUP BY a.author_id, rank, publications, ar.year) as final
LEFT JOIN (SELECT a.author_id, rank, publications, ar.year AS most_productive_year, count(ar.year) as count_of_pub
    FROM (SELECT author_id, rank_total_pubs as rank, total_pubs as publications
    FROM author 
    ORDER BY rank_total_pubs 
    LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100) AS a
    INNER JOIN authorship au ON a.author_id = au.author_id
    INNER JOIN article ar ON au.article_id = ar.article_id
    GROUP BY a.author_id, rank, publications, ar.year) as final1 ON 
    final.author_id = final1.author_id AND final.count_of_pub < final1.count_of_pub
WHERE final1.author_id IS NULL
ORDER BY final.rank 
LIMIT  0.01 * (SELECT COUNT(*) FROM author) / 100;

 * postgresql://airflow:***@postgres/airflow
3 rows affected.
Returning data to local variable query_four


In [28]:
query_four

author_id,rank,publications,most_productive_year,count_of_pub
WangY,1,163,2022,25
WangX,2,158,2022,20
ZhangJ,3,154,2022,30


## 5.2. Graph Database

In [6]:
conn_neo = Neo4jConnection(uri='bolt://neo:7687', user='', pwd='')

### Number of articles

In [7]:
result = conn_neo.query('MATCH (n:Article) RETURN COUNT(n) AS ct')
print(result[0]['ct'])

15240


### Number of authors

In [8]:
result = conn_neo.query('MATCH (n:Author) RETURN COUNT(n) AS ct')
print(result[0]['ct'])

29070


In [10]:
result_warmup1 = conn_neo.query("""
        MATCH (n)
        OPTIONAL MATCH (n)-[r]->()
        RETURN count(n.prop) + count(r.prop)
        """)
print(f'Warm-up query result: {result_warmup1}')


result_warmup2 = conn_neo.query('MATCH (n:Article) RETURN COUNT(n) AS ct')
print(result_warmup2[0]['ct'])

result_warmup3 = conn_neo.query('MATCH (n:Author) RETURN COUNT(n) AS ct')
print(result_warmup3[0]['ct'])

result_warmup4 = conn_neo.query('MATCH (n:Journal) RETURN COUNT(n) AS ct')
print(result_warmup4[0]['ct'])

result_warmup5 = conn_neo.query('MATCH (n:Category) RETURN COUNT(n) AS ct')
print(result_warmup5[0]['ct'])

result_warmup4 = conn_neo.query("""
        MATCH (n)
        OPTIONAL MATCH (n:Author)-[r:AUTHORED]->(n2:Article)
        RETURN count(r)
        """)
print(result_warmup4)

Warm-up query result: [<Record count(n.prop) + count(r.prop)=0>]
15240
29070
1600
135
[<Record count(r)=0>]


In [None]:
# Delete
# conn_neo.query('MATCH (n) OPTIONAL MATCH (n)-[r]-() DELETE n, r')

# Delete a specific node (here: all Author nodes):
#conn_neo.query("""MATCH (n:Author) DELETE n""")

In [None]:
print('Setting constraints to unique IDs...')
            # Add ID uniqueness constraint to optimize queries
conn_neo.query('CREATE CONSTRAINT ON(n:Category) ASSERT n.id IS UNIQUE')
conn_neo.query('CREATE CONSTRAINT ON(j:Journal) ASSERT j.id IS UNIQUE')
conn_neo.query('CREATE CONSTRAINT ON(au:Author) ASSERT au.id IS UNIQUE')
conn_neo.query('CREATE CONSTRAINT ON(ar:Article) ASSERT ar.id IS UNIQUE')
print('Constraints to unique IDs successfully set!')

In [None]:
print("Adding 'category' nodes to Neo4J...")
add_category(conn_neo, category)
print("'category' added to Neo4J!")

In [None]:
print("Adding 'journal' nodes to Neo4J...")
add_journal(conn_neo, journal)
print("'journal' added to Neo4J!")

In [None]:
print("Adding 'article' nodes to Neo4J...")
add_article(conn_neo, article)
print("'article' added to Neo4J!")
    

In [None]:
print("Adding 'author' nodes to Neo4J...")
add_author(conn_neo, author)
print("'author' added to Neo4J!")

In [12]:
print("Adding 'article_category' relationship to Neo4J...")
add_article_category(conn_neo, article_category)
print("'article_category' added to Neo4J!")

Adding 'article_category' relationship to Neo4J...
'article_category' added to Neo4J!


In [11]:
print("Adding 'authorship' relationship to Neo4J...")
add_authorship(conn_neo, authorship)
print("'authorship' added to Neo4J!")

Adding 'authorship' relationship to Neo4J...
'authorship' added to Neo4J!


In [13]:
# Add co-authorship
conn_neo.query("""
    MATCH (author1:Author) - [:AUTHORED] -> (article:Article) <-[:AUTHORED] - (author2:Author)
    CREATE (author1)-[new:COAUTHORS]->(author2)
    RETURN type(new);
    """)
print("Added co-authorship relations.")

Added co-authorship relations.


In [None]:

result = conn_neo.query(
"""
MATCH (author:Author)-[:AUTHORED]->(article:Article) 
WHERE author.id = "GousiosG" 
WITH author, COUNT(article) AS number_of_articles, collect(article) AS articles
ORDER BY number_of_articles DESC b
UNWIND articles AS article
MATCH (coauthor:Author)-[:AUTHORED]->(article)
RETURN article, collect(coauthor), COUNT(article), COUNT(coauthor)
"""
)
#for r in result:
#    print(r)
display(result)

In [None]:
# Ego-network WITH the author
MATCH (author:Author)-[:AUTHORED]->(article:Article) 
WHERE author.id = "BirkedalL" 
WITH author, COUNT(article) AS number_of_articles, collect(article) AS articles
ORDER BY number_of_articles DESC 
UNWIND articles AS article
MATCH (coauthor:Author)-[:AUTHORED]->(article)
RETURN article, collect(coauthor), COUNT(article), COUNT(coauthor)

In [None]:
# Ego-network WITHOUT the author
# https://stackoverflow.com/questions/28816222/finding-a-list-of-neo4j-nodes-which-have-the-most-relationships-back-to-another
MATCH (author:Author)-[:AUTHORED]->(article:Article) 
WHERE author.id = "BirkedalL" 
WITH author, COUNT(article) AS number_of_articles, collect(article) AS articles
ORDER BY number_of_articles DESC 
UNWIND articles AS article
MATCH (coauthor:Author)-[:AUTHORED]->(article)
WHERE coauthor <> author
RETURN article, collect(coauthor)

In [None]:
article[article['article_id'].isnull()]

In [None]:
article[article['doi'].isnull()]