In [1]:
import findspark
findspark.init("D:/spark")
import pyspark 
findspark.find()

'D:/spark'

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

In [3]:
spark = SparkSession.builder.master("local").appName("Projet final").getOrCreate()

## Création des bases de données  

In [4]:
# création de la base Author
l1 = [('07890','Jean Paul Sartre'),('05678','Pierre de Ronsard')]
Author = spark.sparkContext.parallelize(l1).toDF(['aid','name'])
Author.createOrReplaceTempView('AuthorSQL') 
Author.show()

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



In [5]:
#création de la base book
l2 = [("0001","L'existentialisme est un humanisme","Philosophie"),
      ("0002","Huis clos. Suivi de Les Mouches","Philosophie"),
     ("0003","Mignonne allons voir si la rose","Poeme"),
     ("0004","Les Amours","Poème")]
book = spark.sparkContext.parallelize(l2).toDF(['bid','title','category'])
book.createOrReplaceTempView('bookSQL') 
book.show()

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poeme|
|0004|          Les Amours|      Poème|
+----+--------------------+-----------+



In [6]:
#création de la base student
l3 = [("S15","toto","Math"),("S16","popo","Eco"),
     ("S17","fofo","Mécanique")]
Student = spark.sparkContext.parallelize(l3).toDF(['sid','sname','dept']) 
Student.createOrReplaceTempView('StudentSQL') 
Student.show()

+---+-----+---------+
|sid|sname|     dept|
+---+-----+---------+
|S15| toto|     Math|
|S16| popo|      Eco|
|S17| fofo|Mécanique|
+---+-----+---------+



In [7]:
#création de la base student
l4 = [("07890","0001"),("07890","0002"),
     ("05678","0003"),("05678","0004")]
write = spark.sparkContext.parallelize(l4).toDF(['aid','bid'])
write.createOrReplaceTempView('writeSQL') 
write.show()

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0004|
+-----+----+



In [8]:
#création de la base borrow
l4 = [("S15","0003","02-01-2020","01-02-2020"),("S15","0002","13-06-2020","null"),
     ("S15","0001","13-06-2020","13-10-2020"),("S16","0002","24-01-2020","24-01-2020"),
     ("S17","0001","12-04-2020","01-07-2020")]
borrow = spark.sparkContext.parallelize(l4).toDF(['sid','bid','checkout-time','return-time'])
borrow.createOrReplaceTempView('borrowSQL') 
borrow.show()

+---+----+-------------+-----------+
|sid| bid|checkout-time|return-time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



## 1-Trouver les titres de tous les livres que l'étudiant sid='S15' a emprunté.

In [9]:
# DSL
borrow.join(book,'bid') \
          .select('title') \
          .filter(F.col('sid')=='S15') \
          .show()

+--------------------+
|               title|
+--------------------+
|Huis clos. Suivi ...|
|Mignonne allons v...|
|L'existentialisme...|
+--------------------+



In [10]:
#SQL
spark.sql("""
select distinct (title)
from bookSQL, borrowSQL 
where  bookSQL.bid = borrowSQL.bid
and borrowSQL.sid='S15'
""").show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
|Huis clos. Suivi ...|
|Mignonne allons v...|
+--------------------+



## 2-Trouver les titres de tous les livres qui n'ont jamais été empruntés par un étudiant.


In [11]:
# DSL
borrow.join(book,'bid',how = "right") \
          .filter(F.col('sid').isNull()) \
          .select('title') \
          .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



In [12]:
#SQL
spark.sql("""
SELECT  distinct (title)
FROM bookSQL
WHERE  title NOT IN (SELECT distinct title
FROM bookSQL B, borrowSQL C, StudentSQL S
WHERE B.bid = C.bid
AND S.sid=C.sid)
""").show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



## 3-Trouver tous les étudiants qui ont emprunté le livre bid=’0002’

In [13]:
# DSL
borrow.join(Student,['sid']) \
          .filter(F.col('bid')=="0002")\
          .select('sname') \
          .show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



In [14]:
#SQL
spark.sql("""
SELECT distinct (sname) 
From StudentSQL ,borrowSQL
where StudentSQL.sid=borrowSQL.sid
And borrowSQL.bid='0002'
""").show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



## 4-Trouver les titres de tous les livres empruntés par des étudiants en Mécanique (département Mécanique) (ERRATUM !!)


In [15]:
# DSL
borrow.join(Student,['sid']) \
          .filter(F.col('dept')=="Mécanique")\
          .join(book,['bid'])\
          .select('title') \
          .show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



In [16]:
#SQL
spark.sql("""
select title 
from bookSQL, borrowSQL, StudentSQL
where borrowSQL.bid = bookSQL.bid
and borrowSQL.sid = StudentSQL.sid
and StudentSQL.dept = 'Mécanique'
""").show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



## 5-Trouver les étudiants qui n’ont jamais emprunté de livre. 

In [17]:
# DSL
borrow.join(Student,['sid'],how = "right") \
          .filter(F.col('sid').isNull()) \
          .select('sname') \
          .show()

+-----+
|sname|
+-----+
+-----+



In [18]:
#SQL
spark.sql("""
select distinct(sname)
from StudentSQL
where StudentSQL.sid not in (select sid from borrowSQL)
""").show()

+-----+
|sname|
+-----+
+-----+



## 6- Déterminer l’auteur qui a écrit le plus de livres. 

In [19]:
# DSL
Livres = book.join(write,['bid']) \
          .join(Author,['aid'])\
          .groupBy('name')\
          .agg(F.count('name').alias("Nombre de livres"))

Nb_livres = Livres \
    .agg(F.max('Nombre de livres').alias("Nombre de livres"))

Livres.join(Nb_livres, 'Nombre de livres')\
     .show(1) 

+----------------+----------------+
|Nombre de livres|            name|
+----------------+----------------+
|               2|Jean Paul Sartre|
+----------------+----------------+
only showing top 1 row



In [20]:
#SQL
spark.sql("""
select count(distinct writeSQL.bid) as Nombre_livre, name
from AuthorSQL, writeSQL
where AuthorSQL.aid=writeSQL.aid
group by name
order by Nombre_livre desc
""").show(1)

+------------+----------------+
|Nombre_livre|            name|
+------------+----------------+
|           2|Jean Paul Sartre|
+------------+----------------+
only showing top 1 row



## 7- Déterminer les personnes qui n’ont pas encore rendu les livres.

In [21]:
# DSL
borrow.join(Student,['sid'],how = "right") \
          .filter(F.col('return-time')== 'null') \
          .select('sname') \
          .show()

+-----+
|sname|
+-----+
| toto|
+-----+



In [22]:
#SQL
spark.sql("""
select sname
from StudentSQL, borrowSQL
where StudentSQL.sid=borrowSQL.sid
and borrowSQL.`return-time`= 'null'
""").show()

+-----+
|sname|
+-----+
| toto|
+-----+



## 8- Créer une nouvelle colonne dans la table borrow qui prend la valeur 1, si la durée d'emprunt est supérieur à 3 mois,  sinon 0.  (utiliser la fonction spark to_date) par la suite faire un export des données en CSV. dans un répertoire nommé contention (Attention pas de path en dur!)


In [27]:
bdd = borrow.withColumn("start_emprunt", F.to_date(F.col("`checkout-time`"), "dd-MM-yyyy"))\
    .withColumn("end_emprunt", F.to_date(F.col("`return-time`"), "dd-MM-yyyy"))\
    .withColumn("Duree", F.datediff(F.col("end_emprunt"), F.col("start_emprunt")))\
    .withColumn("plus que 3 mois",(F.when((F.col("Duree").isNull()) | (F.col("Duree")>= 90), 1).otherwise(0)))\
    .select("sid", "bid", "`checkout-time`", "`return-time`", "plus que 3 mois")

In [28]:
bdd.toPandas().to_csv("data_file.csv", header=True)

## 9-déterminer les livres qui n’ont jamais été empruntés. 

In [29]:
# DSL
borrow.join(book,['bid'],how = "right") \
          .filter(F.col('checkout-time').isNull()) \
          .select('title') \
          .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



In [30]:
#SQL
spark.sql("""
select distinct title 
from bookSQL, borrowSQL
where bookSQL.bid not in (select bid from borrowSQL)
""").show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



In [31]:
# Stopping SparkSession
spark.stop()