Load the trimmed version of the datasets "output_author", "output_book", "output_incollection"

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=ff8b6642acc5f717fa65b483d98d4be79075d1c1edf7248541a82356911ec5da
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


### Data loading and DataFrame setup

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import count

# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("MyFirstSparkApplication") \
      .getOrCreate()
# master contains the URL of your remote spark instance or 'local'

In [5]:
# Load Author DataFrame
df_author = spark.read.options(header= True,inferSchema=True,delimiter=";").csv("output_author.csv")

#Print detected schema 
df_author.printSchema()

df_author.show()

root
 |-- ID: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- orcid: string (nullable = true)

+-------+------------------+-------------------+
|     ID|            author|              orcid|
+-------+------------------+-------------------+
|9456800|    Russell Turpin|5795-9471-2826-0825|
|9456801|       Frank Olken|8807-3389-1302-0821|
|9456802|      Guido Frisch|8820-5977-9510-2436|
|9456803|       Piero Borga|5788-5737-3147-7886|
|9456804|      Roberto Mura|8445-3948-0176-8011|
|9456805|    Alessia Amelio|9386-1151-9838-0626|
|9456806|   Antonio Filieri|4698-3195-2592-9346|
|9456807|       Marco Bessi|5938-3585-4978-9699|
|9456808| Riccardo Desimini|2190-1298-3984-5639|
|9456809|      Luca Cremona|8296-8810-9545-3438|
|9456810|   Carmine Miccoli|9115-5513-4896-5275|
|9456811|Daniele Dell'Aglio|8627-6693-0362-6597|
|9456812|   Sonia Cenceschi|8394-5631-2089-4980|
|9456813|     Carlo D'Eramo|0301-9277-4230-0827|
|9456814|Nicola Carapellese|5672-2690-3840-1187|
|9

In [6]:
# Load Book DataFrame
df_book = spark.read.options(header= True,inferSchema=True,delimiter=";").csv("output_book.csv")

# Print detected schema
df_book.printSchema()

df_book.show()

root
 |-- ID: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- author-bibtex: string (nullable = true)
 |-- author-orcid: string (nullable = true)
 |-- booktitle: string (nullable = true)
 |-- cdrom: string (nullable = true)
 |-- cite: string (nullable = true)
 |-- cite-label: string (nullable = true)
 |-- crossref: string (nullable = true)
 |-- editor: string (nullable = true)
 |-- editor-orcid: string (nullable = true)
 |-- ee: string (nullable = true)
 |-- ee-type: string (nullable = true)
 |-- i: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn-type: string (nullable = true)
 |-- key: string (nullable = true)
 |-- mdate: timestamp (nullable = true)
 |-- month: string (nullable = true)
 |-- note: string (nullable = true)
 |-- note-type: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publisher-href: string (nullable = true)
 |-- publtype: string (nullable = true)
 |-- school: string

In [7]:
#Dropping irrelevant columns
df_book = df_book.drop(df_book["author-bibtex"]).drop(df_book["author-orcid"]).drop(df_book["cdrom"]).drop(df_book["cite"]).drop(df_book["cite-label"]) \
  .drop(df_book["editor-orcid"]).drop(df_book["ee-type"]).drop(df_book["i"]).drop(df_book["isbn-type"]).drop(df_book["note"]) \
  .drop(df_book["note-type"]).drop(df_book["publisher-href"]).drop(df_book["publtype"]).drop(df_book["series-href"]) \
  .drop(df_book["sub"]).drop(df_book["sup"])
df_book.show()

+----+--------------------+---------+--------+------+--------------------+-----------------+--------------------+-------------------+-----+-----+--------------------+--------------------+------+--------------------+----+------+----+
|  ID|              author|booktitle|crossref|editor|                  ee|             isbn|                 key|              mdate|month|pages|           publisher|              school|series|               title| url|volume|year|
+----+--------------------+---------+--------+------+--------------------+-----------------+--------------------+-------------------+-----+-----+--------------------+--------------------+------+--------------------+----+------+----+
| 937|    Michael A. Curth|     null|    null|  null|https://d-nb.info...|978-3-89012-177-2|     phd/dnb/Curth89|2021-07-17 00:00:00| null|1-370|        Eul, Germany|                null|  null|Planspieltechnik ...|null|  null|1989|
| 974|Eberhard E. Wedekind|     null|    null|  null|https://d-nb.in

In [8]:
# Load Incollection DataFrame
df_incollection = spark.read.options(header= True,inferSchema=True,delimiter=";").csv("output_incollection.csv")

# Print detected schema
df_incollection.printSchema()

df_incollection.show()

root
 |-- ID: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- author-orcid: string (nullable = true)
 |-- booktitle: string (nullable = true)
 |-- cdrom: string (nullable = true)
 |-- chapter: string (nullable = true)
 |-- cite: string (nullable = true)
 |-- cite-label: string (nullable = true)
 |-- crossref: string (nullable = true)
 |-- ee: string (nullable = true)
 |-- ee-type: string (nullable = true)
 |-- i: string (nullable = true)
 |-- key: string (nullable = true)
 |-- mdate: timestamp (nullable = true)
 |-- note: string (nullable = true)
 |-- number: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publisher-href: string (nullable = true)
 |-- publtype: string (nullable = true)
 |-- sub: string (nullable = true)
 |-- sup: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- year: integer (nullable = true)

+-----+--------------------+----------------

In [9]:
#Dropping irrelevant columns
df_incollection = df_incollection.drop(df_incollection["author-orcid"]).drop(df_incollection["cdrom"]).drop(df_incollection["cite"]) \
  .drop(df_incollection["cite-label"]).drop(df_incollection["ee-type"]).drop(df_incollection["i"]).drop(df_incollection["note"]).drop(df_incollection["publisher-href"]) \
  .drop(df_incollection["publtype"]).drop(df_incollection["sub"]).drop(df_incollection["sup"]).drop(df_incollection["publtype"])
df_incollection.show()

+-----+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------------+------+---------+---------+--------------------+--------------------+----+
|   ID|              author|           booktitle|chapter|            crossref|                  ee|                 key|              mdate|number|    pages|publisher|               title|                 url|year|
+-----+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------------+------+---------+---------+--------------------+--------------------+----+
|95284|        Lynne Parker|Handbook of Compu...|   null| reference/sp/2015ci|https://doi.org/1...|reference/sp/Park...|2017-05-16 00:00:00|  null|1395-1406|     null|Collective Manipu...|db/reference/sp/c...|2015|
|95285|Oscar Montiel Ros...|Handbook of Compu...|   null| reference/sp/2015ci|https://doi.org/1...|reference/sp/RossC15|2017-06-09 00:00:00|

### 5 initial queries

In [10]:
#adding three authors with method 1 - slower (creates a list of the entire dataframe and appends)
author_collection = df_author.collect()
author_collection.append({"ID" : 0, "author" : "Francesco Scandale", "orcid" : "0101-9292-8383-7474"})
author_collection.append({"ID" : 1, "author" : "Marta Radaelli", "orcid" : "2345-5678-9823-0022"})
author_collection.append({"ID" : 2, "author" : "Roberto Macaccaro", "orcid" : "3455-4292-8345-1236"})
df_author = spark.createDataFrame(author_collection)

In [11]:
#adding two authors with method 2 - faster (union of two more dataframes)
columns = ['ID', 'author', 'orcid']
vals = [("3", "Elena Montemurro", "2345-9384-4747-1298"),
        ("4", "Luca Rondini", "1234-9876-6574-3456")]

newRow = spark.createDataFrame(vals, columns)
df_author = df_author.union(newRow)

In [12]:
#adding a book with us as authors
columns = ["ID","author","booktitle","crossref","editor","ee","isbn","key","mdate","month","pages","publisher","school", \
           "series","title","url","volume","year"]
vals = [("10","Francesco Scandale|Marta Radaelli","SMBUD book","null","Luca Rondini","www.smbud.it/book","123-456-78914-35","sbmud/book","2022-12-5","null","1-150", \
        "Springer","PoliMi","null","SMBUD book","www.smbud.it/book","1","2022")]

newBook = spark.createDataFrame(vals, columns)
df_book = df_book.union(newBook)
df_book.filter(df_book["ID"]=="10").show()

+---+--------------------+----------+--------+------------+-----------------+----------------+----------+---------+-----+-----+---------+------+------+----------+-----------------+------+----+
| ID|              author| booktitle|crossref|      editor|               ee|            isbn|       key|    mdate|month|pages|publisher|school|series|     title|              url|volume|year|
+---+--------------------+----------+--------+------------+-----------------+----------------+----------+---------+-----+-----+---------+------+------+----------+-----------------+------+----+
| 10|Francesco Scandal...|SMBUD book|    null|Luca Rondini|www.smbud.it/book|123-456-78914-35|sbmud/book|2022-12-5| null|1-150| Springer|PoliMi|  null|SMBUD book|www.smbud.it/book|     1|2022|
+---+--------------------+----------+--------+------------+-----------------+----------------+----------+---------+-----+-----+---------+------+------+----------+-----------------+------+----+



In [13]:
#update our book: DataFrames are immutable, so we need to delete the inserted one and create a new row
df_book = df_book.filter(df_book["ID"]!="10")
columns = ["ID","author","booktitle","crossref","editor","ee","isbn","key","mdate","month","pages","publisher","school", \
           "series","title","url","volume","year"]
vals = [("10","Francesco Scandale|Marta Radaelli|Luca Rondini|Roberto Macaccaro|Elena Montemurro","SMBUD book","null","Luca Rondini","www.smbud.it/book","123-456-78914-35","sbmud/book","2022-12-5","null","1-150", \
        "Springer","PoliMi","null","SMBUD book","www.smbud.it/book","1","2022")]
updateBook = spark.createDataFrame(vals, columns)
df_book = df_book.union(updateBook)
print(df_book.filter(df_book["ID"]=="10").select("author").collect())

[Row(author='Francesco Scandale|Marta Radaelli|Luca Rondini|Roberto Macaccaro|Elena Montemurro')]


In [14]:
#creating bridge tables between author and book, between author and incollection, between book and incollection
df_bridge_author_book = df_author.withColumnRenamed("ID","authorID").join(df_book.withColumnRenamed("ID","bookID"),df_book["author"].contains(df_author["author"])) \
  .select("bookID","authorID")
df_book = df_book.drop(df_book["author"]) #drop this column for consistency

df_bridge_author_incollection = df_author.withColumnRenamed("ID","authorID").join(df_incollection.withColumnRenamed("ID","incollectionID"),df_incollection["author"].contains(df_author["author"])) \
  .select("incollectionID","authorID")
df_incollection = df_incollection.drop(df_incollection["author"]) #drop this column for consistency

df_bridge_book_incollection = df_incollection.withColumnRenamed("ID","incollectionID").join(df_book.withColumnRenamed("ID","bookID"),df_incollection["booktitle"]==df_book["booktitle"]) \
  .select("bookID","incollectionID")
df_incollection = df_incollection.drop(df_incollection["booktitle"]) #drop this column for consistency

#print the new bridge tables to make the transformations execute
df_bridge_author_book.show()
df_bridge_author_incollection.show()
df_bridge_book_incollection.show()

+-------+--------+
| bookID|authorID|
+-------+--------+
|6406409| 9457040|
|6401514| 9457135|
|6406190| 9457158|
|6401393| 9457394|
|  20030| 9457528|
|6398784| 9457528|
|6400525| 9457528|
|6400649| 9457693|
|  12003| 9457705|
|    937| 9457733|
|   4352| 9457767|
|   4352| 9457768|
|    974| 9457771|
|   1166| 9457963|
|   1358| 9458157|
|6400243| 9458197|
|  21352| 9458318|
|  17128| 9458397|
|   1603| 9458402|
|   1808| 9458608|
+-------+--------+
only showing top 20 rows

+--------------+--------+
|incollectionID|authorID|
+--------------+--------+
|         97843| 9457158|
|         98866| 9457158|
|         99421| 9457158|
|         99529| 9457158|
|         95590| 9457528|
|         95365| 9457671|
|         95512| 9458011|
|        102103| 9458512|
|         95504| 9458999|
|        101279| 9459171|
|         99652| 9459357|
|         95365| 9459427|
|         95329| 9459626|
|        102048| 9459766|
|         97383| 9459871|
|        101405| 9460176|
|        101636| 9460176

In [15]:
#delete Luca Rondini as author of "SMBUD book"
lucaID = df_author.filter(df_author["author"]=="Luca Rondini").select("ID").collect()
df_bridge_author_book = df_bridge_author_book.filter((df_bridge_author_book["authorID"]!=lucaID[0]["ID"]) | (df_bridge_author_book["bookID"]!="10"))
df_bridge_author_book.filter(df_bridge_author_book["bookID"]=="10").show()

+------+--------+
|bookID|authorID|
+------+--------+
|    10|       0|
|    10|       1|
|    10|       2|
|    10|       3|
+------+--------+



In [16]:
#adding an incollection to our book
columns = ["ID","chapter","crossref","ee","key","mdate","number","pages","publisher","title","url","year"]
vals = [("100","1","null","www.smbud.it/book/chapter1","sbmud/book/chapter1","2022-12-4","null","20-35", \
         "Springer","SMBUD book, chapter 1","www.smbud.it/book/chapter1","2022")]

newIncollection = spark.createDataFrame(vals, columns)
df_incollection = df_incollection.union(newIncollection)
df_incollection.filter(df_incollection["ID"]=="100").show()


#modify the bridge tables
columns = ["bookID","incollectionID"]
vals = [("10","100")]
newBookIncollection = spark.createDataFrame(vals, columns)
df_bridge_book_incollection = df_bridge_book_incollection.union(newBookIncollection)
df_bridge_book_incollection.filter(df_bridge_book_incollection["incollectionID"]=="100").show()

columns = ["incollectionID","authorID"]
vals = [("100","3")]
newAuthorIncollection = spark.createDataFrame(vals, columns)
df_bridge_author_incollection = df_bridge_author_incollection.union(newAuthorIncollection)
df_bridge_author_incollection.filter(df_bridge_author_incollection["incollectionID"]=="100").show()

+---+-------+--------+--------------------+-------------------+---------+------+-----+---------+--------------------+--------------------+----+
| ID|chapter|crossref|                  ee|                key|    mdate|number|pages|publisher|               title|                 url|year|
+---+-------+--------+--------------------+-------------------+---------+------+-----+---------+--------------------+--------------------+----+
|100|      1|    null|www.smbud.it/book...|sbmud/book/chapter1|2022-12-4|  null|20-35| Springer|SMBUD book, chapt...|www.smbud.it/book...|2022|
+---+-------+--------+--------------------+-------------------+---------+------+-----+---------+--------------------+--------------------+----+

+------+--------------+
|bookID|incollectionID|
+------+--------------+
|    10|           100|
+------+--------------+

+--------------+--------+
|incollectionID|authorID|
+--------------+--------+
|           100|       3|
+--------------+--------+



### 10 queries on the data - Same order as in the report

In [17]:
#show current data
df_author.show()
df_book.show()
df_incollection.show()

+-------+------------------+-------------------+
|     ID|            author|              orcid|
+-------+------------------+-------------------+
|9456800|    Russell Turpin|5795-9471-2826-0825|
|9456801|       Frank Olken|8807-3389-1302-0821|
|9456802|      Guido Frisch|8820-5977-9510-2436|
|9456803|       Piero Borga|5788-5737-3147-7886|
|9456804|      Roberto Mura|8445-3948-0176-8011|
|9456805|    Alessia Amelio|9386-1151-9838-0626|
|9456806|   Antonio Filieri|4698-3195-2592-9346|
|9456807|       Marco Bessi|5938-3585-4978-9699|
|9456808| Riccardo Desimini|2190-1298-3984-5639|
|9456809|      Luca Cremona|8296-8810-9545-3438|
|9456810|   Carmine Miccoli|9115-5513-4896-5275|
|9456811|Daniele Dell'Aglio|8627-6693-0362-6597|
|9456812|   Sonia Cenceschi|8394-5631-2089-4980|
|9456813|     Carlo D'Eramo|0301-9277-4230-0827|
|9456814|Nicola Carapellese|5672-2690-3840-1187|
|9456815|     Chiara Cimino|6918-0442-7442-4475|
|9456816|    Serena Fiocchi|5333-9253-4493-8941|
|9456817|     Lorenz

In [18]:
#show current data
df_bridge_author_book.show()
df_bridge_book_incollection.show()
df_bridge_author_incollection.show()

+-------+--------+
| bookID|authorID|
+-------+--------+
|6406409| 9457040|
|6401514| 9457135|
|6406190| 9457158|
|6401393| 9457394|
|  20030| 9457528|
|6398784| 9457528|
|6400525| 9457528|
|6400649| 9457693|
|  12003| 9457705|
|    937| 9457733|
|   4352| 9457767|
|   4352| 9457768|
|    974| 9457771|
|   1166| 9457963|
|   1358| 9458157|
|6400243| 9458197|
|  21352| 9458318|
|  17128| 9458397|
|   1603| 9458402|
|   1808| 9458608|
+-------+--------+
only showing top 20 rows

+------+--------------+
|bookID|incollectionID|
+------+--------------+
| 95312|         95677|
| 95312|         95673|
| 95312|         95654|
| 95312|         95634|
| 95312|         95633|
| 95312|         95603|
| 95312|         95593|
| 95312|         95585|
| 95312|         95584|
| 95312|         95581|
| 95312|         95564|
| 95312|         95561|
| 95312|         95546|
| 95312|         95539|
| 95312|         95524|
| 95312|         95517|
| 95312|         95514|
| 95312|         95512|
| 95312|      

In [19]:
#1 - WHERE, JOIN
df_author.filter(col("author")=="Michael A. Curth") \
.join(df_bridge_author_book, df_author.ID == df_bridge_author_book.authorID, "inner") \
.join(df_book, df_book.ID == df_bridge_author_book.bookID, "inner") \
.select("author","title") \
.show(truncate=False)

+----------------+-----------------------------------------------------------------------------------+
|author          |title                                                                              |
+----------------+-----------------------------------------------------------------------------------+
|Michael A. Curth|Planspieltechnik und Computer-based-Training zur Schulung von Einkäufern im Handel.|
+----------------+-----------------------------------------------------------------------------------+



In [20]:
#2 - WHERE, LIMIT, LIKE
df_book.filter((col("year") > 2000) \
  & (col("year") < 2010) \
  & (col("key").like("phd/%")))\
  .limit(15).select("key","publisher","title","year") \
  .show()

+--------------------+--------------------+--------------------+----+
|                 key|           publisher|               title|year|
+--------------------+--------------------+--------------------+----+
|     phd/dnb/Klugl01|      Addison-Wesley|Multiagentensimul...|2001|
|    phd/dnb/Lohrey03|                null|Computational and...|2003|
|    phd/dnb/Kohler03|      Westdt. Verlag|Das Selbst im Net...|2003|
|     phd/dnb/Stach01|                 DUV|Zwischen Organism...|2001|
|phd/ethos/Dunsmore03|University of Str...|Reading technique...|2003|
|   phd/uk/Miguel2004|            Springer|Dynamic flexible ...|2004|
|      phd/us/Kim2008|VDM Verlag Dr. Mü...|Scalable video st...|2008|
|     phd/us/Kurc2008|                 VDM|Parallel computin...|2008|
|    phd/de/Baier2008|VDM Verlag Dr. Mü...|Motion perception...|2008|
|  phd/de/Flentge2007|VDM Verlag Dr. Mü...|Maschinelles Lern...|2007|
| phd/de/Alekseev2007|VDM Verlag Dr. Mü...|Ablaufüberwachung...|2007|
|  phd/de/Janneck200

In [21]:
#3 - WHERE, IN, Nested Query
new_books=df_book.filter(col("year")>=2022).select("ID").collect()
new_books = [new[0] for new in new_books]
df_book.filter(col("ID").isin(new_books)) \
.select("ID","title") \
.show()

+-------+--------------------+
|     ID|               title|
+-------+--------------------+
|6398682|SQL Server Databa...|
|6398685|Digital Humanism ...|
|6398733|Internet of Thing...|
|6398748|Measuring Systemi...|
|6398750| Company Controlling|
|6398767|Assessing the Qua...|
|6398986|Adaptive Compensa...|
|6399161|IoT System Design...|
|6399166|Alternating Direc...|
|6399167|Information Model...|
|6399168|Embedded Robotics...|
|6399203|Blockchain Scalab...|
|6399305|Data Center Netwo...|
|6399347|Non-invasive Moni...|
|6399358|Random Contractio...|
|6399359|Consensus Over Sw...|
|6399360|Signal Processing...|
|6399370|Security of Cyber...|
|6399380|Text Mining for I...|
|6399385|Fundamentals of C...|
+-------+--------------------+
only showing top 20 rows



In [22]:
#4 - GROUP BY, 1 JOIN, AS
query1 = df_book.join(df_bridge_author_book,df_bridge_author_book["bookID"]==df_book["ID"],"inner").groupBy("publisher").count() \
          .withColumnRenamed("count","#authors").filter((col("#authors")>1) & (col("publisher")!="null")).sort(col("#authors").desc()).limit(5)
query1.show()

+--------------------+--------+
|           publisher|#authors|
+--------------------+--------+
|            Springer|     711|
|VDM Verlag Dr. Mü...|      33|
|              Shaker|       7|
|    Dt. Univ.-Verlag|       5|
|          VDI-Verlag|       4|
+--------------------+--------+



In [23]:
#5 - WHERE, GROUP BY
df_book.filter(col("year") == 2000).groupBy("publisher").count().show(truncate = False)

+-----------------+-----+
|publisher        |count|
+-----------------+-----+
|Kluwer / Springer|1    |
|null             |1    |
|Kluwer           |1    |
|GI               |1    |
|Teubner          |1    |
|Springer         |22   |
|World Scientific |1    |
+-----------------+-----+



In [24]:
#6 - GROUP BY, HAVING, AS
df_book.groupBy("publisher").agg(count("ID") \
.alias("Number of book for publisher")) \
.filter((col("Number of book for publisher")>=3)\
& (col("Number of book for publisher")<=7))\
.show(truncate = False)

+-----------------------------------------------------------+----------------------------+
|publisher                                                  |Number of book for publisher|
+-----------------------------------------------------------+----------------------------+
|Kluwer / Springer                                          |7                           |
|Elsevier                                                   |6                           |
|VDI-Verlag                                                 |4                           |
|Oldenbourg                                                 |3                           |
|Birkhäuser / Springer                                      |3                           |
|Schloss Dagstuhl - Leibniz-Zentrum fuer Informatik, Germany|3                           |
|CRC Press                                                  |7                           |
|Dt. Univ.-Verlag                                           |5                           |

In [25]:
#7 - WHERE, GROUP BY, HAVING, AS
query2 = df_book.filter(col("year")>2015).groupBy("publisher"). \
count().withColumnRenamed("count","#books").filter(col("#books")>20). \
sort(col("#books").desc())
query2.show()

+---------------+------+
|      publisher|#books|
+---------------+------+
|       Springer|   688|
|         Apress|    23|
|Springer Vieweg|    23|
+---------------+------+



In [26]:
#8 - WHERE, Nested Query (i.e., 2-step Queries), GROUP BY
springer_books=df_book.filter(col("publisher")=="Springer").select("title").collect()
springer_books = [new[0] for new in springer_books]
df_book.filter(col("title").isin(springer_books)).groupBy("year").count().show(truncate = False)

+----+-----+
|year|count|
+----+-----+
|1987|24   |
|2016|87   |
|2012|26   |
|2020|101  |
|1988|16   |
|2019|111  |
|2017|80   |
|1977|3    |
|2014|86   |
|1984|9    |
|2013|40   |
|1982|6    |
|2005|26   |
|2000|22   |
|1981|13   |
|1974|1    |
|2002|28   |
|2018|105  |
|2009|24   |
|1995|21   |
+----+-----+
only showing top 20 rows



In [27]:
#9 - WHERE, GROUP BY, HAVING, 1 JOIN 
#(not counting the join for the bridge table, since it's just to re-establish the correspondence)
df_book.filter((col("year") > 2000) & (col("year") < 2011)) \
.join(df_bridge_author_book, df_book.ID == df_bridge_author_book.bookID) \
.join(df_author, df_author.ID == df_bridge_author_book.authorID) \
.groupBy(["authorID", "author"]) \
.count().alias("count") \
.filter(col("count")>1) \
.show()

+--------+-------------------+-----+
|authorID|             author|count|
+--------+-------------------+-----+
| 9552217|Oscar Castillo 0001|    2|
| 9769960|             Herman|    3|
| 9563994|  Joris De Schutter|    2|
| 9477300|          Jörg Roth|    2|
| 9552036|     Patricia Melin|    2|
| 9550107|         Jörg Rothe|    2|
+--------+-------------------+-----+



In [None]:
#10 - WHERE, GROUP BY, HAVING, 2 JOINs
df_book.filter(col("year")>1995) \
.join(df_bridge_author_book, df_book.ID == df_bridge_author_book.bookID) \
.join(df_author, df_author.ID == df_bridge_author_book.authorID) \
.filter(col("author").like("A%")) \
.groupBy("year") \
.agg(count("year").alias("count")) \
.filter(col("count")>=2) \
.sort(col("year").desc()) \
.show()


+----+-----+
|year|count|
+----+-----+
|2021|    2|
|2020|    4|
|2018|    8|
|2017|    6|
|2016|    3|
|2014|    5|
|2009|    2|
|2006|    2|
|2005|    3|
|2003|    2|
+----+-----+

