# Librairies

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Instanciation

In [2]:
Spark = SparkSession.builder.master("local").appName("DF").getOrCreate()

# Création des tables

### Author

In [3]:
l1 = [('07890','Jean Paul Sartre'),('05678','Pierre de Ronsard')]
rdd1 = Spark.sparkContext.parallelize(l1)
Author = rdd1.toDF(['aid','name'])
Author.createOrReplaceTempView('AuthorSQL')
Author.show()

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



### book

In [4]:
l2 = [('0001',"L'existentialisme est un humanisme",'Philosophie'),
      ('0002',"Huis clos. Suivi de Les Mouches",'Philosophie'),
      ('0003',"Mignonne allons voir si la rose","Poème"),
      ('0004',"Les Amours","Poème")]
rdd2 = Spark.sparkContext.parallelize(l2)
book = rdd2.toDF(['bid','title','category'])
book.createOrReplaceTempView('bookSQL')
book.show()

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poème|
|0004|          Les Amours|      Poème|
+----+--------------------+-----------+



### Student

In [5]:
l3 = [('S15','toto','Math'),('S16','popo','Eco'),('S17','fofo',"Mécanique")]
rdd3 = Spark.sparkContext.parallelize(l3)
Student = rdd3.toDF(['sid','sname','dept'])
Student.createOrReplaceTempView('StudentSQL')
Student.show()

+---+-----+---------+
|sid|sname|     dept|
+---+-----+---------+
|S15| toto|     Math|
|S16| popo|      Eco|
|S17| fofo|Mécanique|
+---+-----+---------+



### write

In [6]:
l4 = [('07890','0001'),('07890','0002'),('05678','0003'),('05678','0003')]
rdd4 = Spark.sparkContext.parallelize(l4)
write = rdd4.toDF(['aid','bid'])
write.createOrReplaceTempView('writeSQL')
write.show()

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0003|
+-----+----+



### borrow

In [7]:
l5 = [('S15','0003','02-01-2020','01-02-2020'),
      ('S15','0002','13-06-2020', None),
      ('S15','0001','13-06-2020','13-10-2020'),
      ('S16','0002','24-01-2020','24-01-2020'),
      ('S17','0001','12-04-2020','01-07-2020')]
rdd5 = Spark.sparkContext.parallelize(l5)
borrow = rdd5.toDF(['sid','bid','checkout-time','return-time'])
borrow.createOrReplaceTempView('borrowSQL')
borrow.show()

+---+----+-------------+-----------+
|sid| bid|checkout-time|return-time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



# 1-Trouver les titres de tous les livres que l'étudiant sid='S15' a emprunté. 

### SQL

In [8]:
Spark.sql(''' select sid, title 
                from borrowSQL as bw 
                    join bookSQL as bk 
                        on bk.bid = bw.bid
                where bw.sid == 'S15' ''').show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



### DSL

In [9]:
book.join(borrow,'bid') \
    .select('sid','title') \
    .filter(F.col('sid')=='S15') \
    .show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



# 2-Trouver les titres de tous les livres qui n'ont jamais été empruntés par un étudiant.

### SQL

In [10]:
Spark.sql(''' select title
                from bookSQL as bk 
                    left join borrowSQL as bw 
                        on bk.bid = bw.bid
                where bw.`checkout-time` is null ''').show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



### DSL

In [11]:
book.join(borrow, "bid", how="left")\
     .select("title")\
     .filter(F.col("checkout-time").isNull())\
     .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



# 3-Trouver tous les étudiants qui ont emprunté le livre bid=’0002’.

### SQL

In [12]:
Spark.sql(''' select sname, bid
                from StudentSQL as S
                    join borrowSQL as bw 
                        on bw.sid = S.sid
                where bid == '0002' ''').show()

+-----+----+
|sname| bid|
+-----+----+
| popo|0002|
| toto|0002|
+-----+----+



### DSL

In [13]:
Student.join(borrow, "sid")\
     .select("sname","bid")\
     .filter(F.col("bid") == "0002")\
     .show()

+-----+----+
|sname| bid|
+-----+----+
| popo|0002|
| toto|0002|
+-----+----+



# 4-Trouver les titres de tous les livres empruntés par des étudiants en mécanique (département mécanique).

### SQL

In [14]:
Spark.sql(''' select title
                from (
                    select bw.bid
                        from StudentSQL as S
                            join borrowSQL as bw 
                                on bw.sid = S.sid
                        where dept == "Mécanique"
                    ) req
                    join bookSQL as bk
                        on req.bid = bk.bid ''').show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



### DSL

In [15]:
req = Student.join(borrow, "sid")\
             .select("bid")\
             .filter(F.col("dept") == "Mécanique")

req.join(book, "bid") \
    .select("title") \
    .show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



# 5-Trouver les étudiants qui n’ont jamais emprunté de livre. 

### SQL

In [16]:
Spark.sql(''' select sname
                from StudentSQL as S 
                    left join borrowSQL as bw 
                        on S.sid = bw.sid
                where S.sid is null ''').show()

+-----+
|sname|
+-----+
+-----+



### DSL

In [17]:
Student.join(borrow, "sid", how="left")\
     .select("sname")\
     .filter(F.col("sid").isNull())\
     .show()

+-----+
|sname|
+-----+
+-----+



# 6-Déterminer l’auteur qui a écrit le plus de livres.

### SQL

In [18]:
Spark.sql(''' select first(name) as name, first(nb) as nb
                from (
                    select name, count(DISTINCT bid) as nb
                        from AuthorSQL as A 
                            join writeSQL as w 
                                on A.aid = w.aid
                        group by A.name 
                    ) req
                order by nb desc ''').show()


+----------------+---+
|            name| nb|
+----------------+---+
|Jean Paul Sartre|  2|
+----------------+---+



### DSL

In [19]:
Author.join(write, "aid") \
    .distinct() \
    .groupBy("name") \
    .agg(F.count("bid").alias("nb")) \
    .sort(F.col("nb").desc()) \
    .select(F.first("name").alias("name"),F.first("nb").alias("nb")) \
    .show()

+----------------+---+
|            name| nb|
+----------------+---+
|Jean Paul Sartre|  2|
+----------------+---+



# 7-Déterminer les personnes qui n’ont pas encore rendu les livres.

### SQL

In [20]:
Spark.sql(''' select sname
                from StudentSQL as S 
                    left join borrowSQL as bw 
                        on S.sid = bw.sid
                where bw.`return-time` is null ''').show()

+-----+
|sname|
+-----+
| toto|
+-----+



### DSL

In [21]:
Student.join(borrow, "sid", how="left")\
     .select("sname")\
     .filter(F.col("return-time").isNull())\
     .show()

+-----+
|sname|
+-----+
| toto|
+-----+



# 8-Créer une nouvelle colonne dans la table borrow qui prend la valeur 1, si la durée d'emprunt est supérieur à 3 mois,  sinon 0.    
(utiliser la fonction spark to_date) par la suite faire un export des données en CSV. dans un répertoire nommé contention (Attention pas de path en dur ! )

### SQL     
On considère que les valeurs nulles dans le return-time considèrent que le livre n'a pas été rendu à l'heure où la base de données a été utilisée. On prend donc la date actuelle.

In [22]:
Spark.sql(''' select req.sid, req.bid, req.`checkout-time`, req.`return-time`,
                    (case
                       when datediff(req.`return-time`, req.`checkout-time`) >= 90 then '1'
                       else '0'
                    end) as Plus_3_mois
                from (select sid, bid, to_date(`checkout-time`, 'd-M-y') as `checkout-time`,
                        (case 
                            when `return-time` is null then date(now())
                            else to_date(`return-time`, 'd-M-y')
                        end) as `return-time`
                        from borrowSQL ) as req ''').write.csv(path = "./contention", mode = "overwrite", header = 'true')

### DSL

In [23]:
borrow.withColumn("checkout-time", F.to_date(F.col("checkout-time"), "dd-MM-yyyy")) \
    .withColumn("return-time", F.to_date(F.col("return-time"), "dd-MM-yyyy")) \
    .withColumn("return-time", (F.when(F.col("return-time").isNull(), F.current_date()).otherwise(F.col("return-time")))) \
    .withColumn("Duree", F.datediff(F.col("return-time"), F.col("checkout-time"))) \
    .withColumn("plus_3_mois", (F.when(F.col("Duree")>=90, 1).otherwise(0))) \
    .select("sid", "bid", "checkout-time", "return-time", "plus_3_mois") \
    .write.csv(path = "./contention", mode = "overwrite", header = 'true')

# 9-Déterminer les livres qui n’ont jamais été empruntés. 

### SQL

In [24]:
Spark.sql(''' select title
                from bookSQL as bk 
                    left join borrowSQL as bw 
                        on bk.bid = bw.bid
                where bw.`checkout-time` is null ''').show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



### DSL

In [25]:
book.join(borrow, "bid", how="left")\
     .select("title")\
     .filter(F.col("checkout-time").isNull())\
     .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



# Fermeture instanciation

In [26]:
Spark.stop()