In [4]:
from ipywidgets import interact
import viz_functions

import os

postgres_user = os.environ.get('POSTGRES_USER')
postgres_password = os.environ.get('POSTGRES_PASSWORD')

## Setup Connection

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PostgreSQL Connection with PySpark") \
    .config("spark.jars", "/visualization/postgresql-42.6.0.jar") \
    .getOrCreate()

url = "jdbc:postgresql://postgres:5432/airflow"

properties = {
    "user": "airflow",
    "password": "airflow",
    "driver": "org.postgresql.Driver"
}

## Load Tables

In [8]:
schema = 'papers'
tables = ['paper', 'author', 'journal', 'keyword']

for table in tables:
    table_name = f'{schema}.{table}'
    df = spark.read.jdbc(url, table_name, properties=properties)
    df.createOrReplaceTempView(table)
    print(f"\n########## {table} ##########")
    df.printSchema()
    df.show(2)
    print(f"Total rows: {df.count()}")


########## paper ##########
root
 |-- api_name: string (nullable = true)
 |-- api_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- published_date: date (nullable = true)
 |-- publisher: string (nullable = true)

+--------+---------+--------------------+-------------------+--------------+--------------------+
|api_name|   api_id|               title|                doi|published_date|           publisher|
+--------+---------+--------------------+-------------------+--------------+--------------------+
|    core|137628615|Glucose Homeostas...|10.22028/d291-38686|    2023-01-06|Saarländische Uni...|
|    core|138044939|COMPASSION FOR TH...|                NaN|    2023-01-06|The Graduate Scho...|
+--------+---------+--------------------+-------------------+--------------+--------------------+
only showing top 2 rows

Total rows: 792650

########## author ##########
root
 |-- api_name: string (nullable = true)
 |-- api_id: string (nu

In [9]:
dates = spark.sql("SELECT DISTINCT week_start_date FROM keyword ORDER BY week_start_date DESC")
dates = dates.rdd.map(lambda row: row[0]).collect()
current_week = dates[0]
current_week

datetime.date(2023, 10, 30)

## Visualize Tables

In [10]:
@interact
def display_tables(table=tables, nrows=(1, 20, 1)):
    df = spark.sql(f"SELECT * FROM {table};").toPandas()
    return df.head(nrows)

interactive(children=(Dropdown(description='table', options=('paper', 'author', 'journal', 'keyword'), value='…

## Top n-grams By Date Interval 

### Barplot

In [12]:
@interact
def int_barplot_grams(start_date=dates, n=(1,3,1), limit=(5,20,1)):
    query = """
                SELECT word, count
                FROM keyword
                WHERE week_start_date = '{week_start}'
                AND n = {n}
                ORDER BY count DESC
                LIMIT {limit}
            """.format(**{'week_start': start_date, 'n': n, 'limit': limit})
    viz_functions.barplot_ngrams(spark, query, x='word', y='count', title=f"This week's most used {n}-grams")

interactive(children=(Dropdown(description='start_date', options=(datetime.date(2023, 10, 30), datetime.date(2…

### Word Cloud

In [13]:
colormaps = ["gist_heat_r", "Reds_r", "Purples_r", "RdGy_r", "RdBu_r", "CMRmap_r"]

@interact
def it_wordcloud_ngrams(start_date=dates, end_date=dates, n=(1,3,1), limit=(5,20,1), colormap=colormaps):
    query = """
                SELECT word, count
                FROM keyword
                WHERE week_start_date = '{week_start}'
                AND n = {n}
                ORDER BY count DESC
                LIMIT {limit}
            """.format(**{'week_start': start_date, 'n': n, 'limit': limit})
    viz_functions.wordcloud_ngrams(spark, query)

interactive(children=(Dropdown(description='start_date', options=(datetime.date(2023, 10, 30), datetime.date(2…

## Keyword Evolution Through Time 

In [5]:
@interact
def it_lineplot_ngrams(week=dates, n=(1,3,1), limit=(4,20,1)):
    query = """
            SELECT kw.week_start_date, kw.word, kw.week_percentage
            FROM keyword as kw 
            INNER JOIN
            (
                SELECT word, count
                FROM keyword
                WHERE week_start_date = '{week_start}'
                AND n = {n}
                ORDER BY count DESC
                LIMIT {limit} ) as temp 
            ON kw.word = temp.word;
        """.format(**{'week_start': week, 'n': n, 'limit': limit})

    viz_functions.lineplot_ngrams(spark, query, x="week_start_date", y="week_percentage", hue="word", title=f"Evolution of this week's top {n}-grams")

interactive(children=(Dropdown(description='week', options=(datetime.date(2023, 11, 6), datetime.date(2023, 10…

## Publications per Journal

In [9]:
years = spark.sql(
    """
    SELECT DISTINCT EXTRACT(YEAR FROM published_date) as year
    FROM paper 
    ORDER BY year DESC
    """)
years = years.rdd.map(lambda row: row[0]).collect()

@interact
def it_barplot_journals(year=years, month=(1,12,1), limit=(5,20,1)):
    query = """
                SELECT j.name as Name, j.issn as ISSN, COUNT(*) as count
                FROM journal as j
                INNER JOIN (
                    SELECT api_name, api_id
                    FROM paper
                    WHERE EXTRACT(MONTH FROM published_date) = {month}
                    AND EXTRACT(YEAR FROM published_date) = {year}
                ) as p
                ON (j.api_name, j.api_id) = (p.api_name, p.api_id)
                WHERE j.name IS NOT NULL
                AND j.name != 'NaN'
                GROUP BY j.name, j.issn
                ORDER BY count DESC
                LIMIT {limit};
            """.format(**{'month': month, 'year': year, 'limit': limit})
    viz_functions.barplot_journals(spark, query, x='Name', y='count')

interactive(children=(Dropdown(description='year', options=(2023,), value=2023), IntSlider(value=6, descriptio…

## Publications per Author

In [None]:
@interact
def barplot_ngrams(year=years, month=(1,12,1), limit=(5,20,1)):
    query = """
                SELECT a.name as Name, COUNT(*) as count
                FROM author as a
                INNER JOIN (
                    SELECT api_name, api_id
                    FROM paper
                    WHERE EXTRACT(MONTH FROM published_date) = {month}
                    AND EXTRACT(YEAR FROM published_date) = {year}
                ) as p
                ON (a.api_name, a.api_id) = (p.api_name, p.api_id)
                WHERE a.name IS NOT NULL
                AND a.name != 'NaN'
                GROUP BY a.name
                ORDER BY count DESC
                LIMIT {limit};
            """.format(**{'month': month, 'year': year, 'limit': limit})
    viz_functions.barplot_authors(spark, query, x='Name', y='count')

interactive(children=(Dropdown(description='year', options=(2023,), value=2023), IntSlider(value=6, descriptio…