### Importation des librairies

In [54]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

### Instanciation de la spark session

In [55]:
spark = SparkSession.builder.master("local").appName("BDA-SQL").getOrCreate()

### Création des tables

In [56]:
l1 = [('07890', 'Jean Paul Sartre'), ('05678', 'Pierre de Ronsard')]
rdd1 = spark.sparkContext.parallelize(l1)
Author = rdd1.toDF(['aid', 'name'])
Author.createOrReplaceTempView('AuthorSQL')
Author.show()

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



In [57]:
l2 = [('0001', "L'existentialisme est un humanisme", 'Philosophie'), \
      ('0002', 'Huis clos. Suivi de Les Mouches', 'Philosophie'), \
     ('0003', 'Mignonne allons voir si la rose', 'Poeme'), \
     ('0004', 'Les Amours', 'Poème')]
rdd2 = spark.sparkContext.parallelize(l2)
book = rdd2.toDF(['bid', 'title', 'category'])
book.createOrReplaceTempView('bookSQL')
book.show()

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poeme|
|0004|          Les Amours|      Poème|
+----+--------------------+-----------+



In [58]:
l3 = [('S15', 'toto', 'Math'), \
      ('S16', 'popo', 'Eco'), \
     ('S17', 'fofo', 'Mécanique')]
rdd3 = spark.sparkContext.parallelize(l3)
Student = rdd3.toDF(['sid', 'sname', 'dept'])
Student.createOrReplaceTempView('StudentSQL')
Student.show()

+---+-----+---------+
|sid|sname|     dept|
+---+-----+---------+
|S15| toto|     Math|
|S16| popo|      Eco|
|S17| fofo|Mécanique|
+---+-----+---------+



In [59]:
l4 = [('07890', '0001'), \
      ('07890', '0002'), \
     ('05678', '0003'), \
     ('05678', '0003')]
rdd4 = spark.sparkContext.parallelize(l4)
write = rdd4.toDF(['aid', 'bid'])
write.createOrReplaceTempView('writeSQL')
write.show()

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0003|
+-----+----+



In [60]:
l5 = [('S15', '0003', '02-01-2020', '01-02-2020'), \
      ('S15', '0002', '13-06-2020', 'null'), \
     ('S15', '0001', '13-06-2020', '13-10-2020'), \
     ('S16', '0002', '24-01-2020', '24-01-2020'), \
     ('S17', '0001', '12-04-2020', '01-07-2020')]
rdd5 = spark.sparkContext.parallelize(l5)
borrow = rdd5.toDF(['sid', 'bid', 'checkout_time', 'return_time'])
borrow.createOrReplaceTempView('borrowSQL')
borrow.show()

+---+----+-------------+-----------+
|sid| bid|checkout_time|return_time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



### 1-Trouver les titres de tous les livres que l'étudiant sid='S15' a emprunté

In [61]:
# SQL
spark.sql("""select title
            from bookSQL
            join borrowSQL
            on bookSQL.bid = borrowSQL.bid
            where borrowSQL.sid = 'S15' """).show()

+--------------------+
|               title|
+--------------------+
|Huis clos. Suivi ...|
|Mignonne allons v...|
|L'existentialisme...|
+--------------------+



In [62]:
# DSL
book.join(borrow, 'bid')\
    .select('title')\
    .filter(F.col('sid')=='S15')\
    .show()

+--------------------+
|               title|
+--------------------+
|Huis clos. Suivi ...|
|Mignonne allons v...|
|L'existentialisme...|
+--------------------+



### 2-Trouver les titres de tous les livres qui n'ont jamais été empruntés par un étudiant

In [63]:
# SQL
spark.sql("""select title 
            from bookSQL
            left join borrowSQL
            on bookSQL.bid = borrowSQL.bid
            where borrowSQL.bid is null""").show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



In [64]:
# DSL
book.join(borrow, book.bid==borrow.bid, how='left')\
    .select('title')\
    .filter(F.col('sid').isNull())\
    .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



### 3-Trouver tous les étudiants qui ont emprunté le livre bid=’0002’

In [65]:
# SQL
spark.sql("""select sname
            from StudentSQL
            join borrowSQL
            on StudentSQL.sid = borrowSQL.sid
            where borrowSQL.bid = '0002' """).show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



In [66]:
# DSL
Student.join(borrow, 'sid')\
    .select('sname')\
    .filter(F.col('bid')=='0002')\
    .show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



### 4-Trouver les titres de tous les livres empruntés par des étudiants en Mécanique

In [67]:
# SQL
spark.sql("""select title
            from bookSQL
            join borrowSQL
            on bookSQL.bid = borrowSQL.bid
            join StudentSQL
            on borrowSQL.sid = StudentSQL.sid
            where StudentSQL.dept = 'Mécanique' """).show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



In [68]:
# DSL
book.join(borrow, 'bid')\
    .join(Student, 'sid')\
    .select('title')\
    .filter(F.col('dept')=='Mécanique')\
    .show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



### 5-Trouver les étudiants qui n’ont jamais emprunté de livre

In [69]:
# SQL
spark.sql("""select sname
            from StudentSQL
            left join borrowSQL
            on StudentSQL.sid = borrowSQL.sid
            where borrowSQL.sid is null""").show()

+-----+
|sname|
+-----+
+-----+



In [70]:
# DSL
Student.join(borrow, Student.sid==borrow.sid, how='left')\
    .select('sname')\
    .filter(F.col('sname').isNull())\
    .show()

+-----+
|sname|
+-----+
+-----+



### 6- Déterminer l’auteur qui a écrit le plus de livres

In [114]:
# SQL 
spark.sql("""select first(name) as auteur, count(distinct bid) as nombre
            from AuthorSQL
            join writeSQL
            on AuthorSQL.aid = writeSQL.aid
            group by name""").show()

+-----------------+------+
|           auteur|nombre|
+-----------------+------+
| Jean Paul Sartre|     2|
|Pierre de Ronsard|     1|
+-----------------+------+



In [109]:
# DSL
Author.join(write, "aid") \
    .distinct() \
    .groupBy("name") \
    .agg(F.count("bid").alias("nombre")) \
    .sort(F.col("nombre").desc()) \
    .select(F.first("name").alias("auteur"),F.first("nombre").alias("nombre")) \
    .show()

+----------------+------+
|          auteur|nombre|
+----------------+------+
|Jean Paul Sartre|     2|
+----------------+------+



### 7- Déterminer les personnes qui n’ont pas encore rendu les livres

In [73]:
# SQL
spark.sql("""select sname
            from StudentSQL
            join borrowSQL
            on StudentSQL.sid = borrowSQL.sid
            where borrowsql.return_time = 'null' """).show()

+-----+
|sname|
+-----+
| toto|
+-----+



In [74]:
# DSL
Student.join(borrow, 'sid')\
    .select('sname')\
    .filter(F.col('return_time')=='null')\
    .show()

+-----+
|sname|
+-----+
| toto|
+-----+



### 8- Créer une nouvelle colonne dans la table borrow qui prend la valeur 1, si la durée d'emprunt est supérieur à 3 mois,  sinon 0.  (utiliser la fonction spark to_date) par la suite faire un export des données en CSV

In [104]:
borrow.withColumn("check_to_date", F.to_date(F.col("checkout_time"), "dd-MM-yyyy"))\
    .withColumn("ret_to_date", F.to_date(F.col("return_time"), "dd-MM-yyyy"))\
    .withColumn("Duree", F.datediff(F.col("ret_to_date"), F.col("check_to_date")))\
    .withColumn("3mois+", (F.when(F.col("Duree")>=90, 1).otherwise(0)))\
    .select("sid", "bid", "checkout_time", "return_time", "3mois+")\
    .write.csv("../Data")

### 9-déterminer les livres qui n’ont jamais été empruntés

In [75]:
# SQL
spark.sql("""select title
            from bookSQL
            left join borrowSQL
            on bookSQL.bid = borrowSQL.bid
            where borrowSQL.bid is null""").show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



In [76]:
# DSL
book.join(borrow, book.bid==borrow.bid, how='left')\
    .select('title')\
    .filter(F.col('sid').isNull())\
    .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+

