   # PROJET EXAMEN TERMINAL: OUTILS BIG DATA

##### Importation des librairies

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf 
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType



#### Instanciation de la Spark Session

In [130]:
spark = SparkSession.builder.master("local").appName("gestion d’emprunt").getOrCreate()

#### Question 0: Création des tables

In [134]:
# création de la base Author
l1 = [('07890','Jean Paul Sartre'),('05678','Pierre de Ronsard')]
rdd1 = spark.sparkContext.parallelize(l1) #On obtient un rdd
Author = rdd1.toDF(['aid','name'])
Author.createOrReplaceTempView('AuthorSQL') # transformation du data frame en table sql !!!!
Author.show()
Author.dtypes # retourne nom des colonnes et leurs types

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



[('aid', 'string'), ('name', 'string')]

In [135]:
# création de la base Book
l2 = [('0001',"L'existentialisme est un humanisme",'Philosophie'),('0002','Huis clos. Suivi de Les Mouches','Philosophie'),('0003','Mignonne allons voir si la rose','Poème'),('0004','Les Amours','Poème')]
rdd2 = spark.sparkContext.parallelize(l2) #On obtient un rdd
Book = rdd2.toDF(['bid','title','category'])
Book.createOrReplaceTempView('BookSQL')# transformation du data frame en table sql !!!!
Book.show()
Book.dtypes # retourne nom des colonnes et leurs types

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poème|
|0004|          Les Amours|      Poème|
+----+--------------------+-----------+



[('bid', 'string'), ('title', 'string'), ('category', 'string')]

In [5]:
# création de la base Student 
l3 = [('S15','toto','Math'),('S16','popo','Eco'),('S17','fofo','Mécanique')]
rdd3 = spark.sparkContext.parallelize(l3) #On obtient un rdd
Student = rdd3.toDF(['sid','sname','dept'])
Student.createOrReplaceTempView('StudentSQL')# transformation du data frame en table sql !!!!
Student.show()
Student.dtypes # retourne nom des colonnes et leurs types

+---+-----+---------+
|sid|sname|     dept|
+---+-----+---------+
|S15| toto|     Math|
|S16| popo|      Eco|
|S17| fofo|Mécanique|
+---+-----+---------+



In [137]:
# création de la base Write 
l4 = [('07890','0001'),('07890','0002'),('05678','0003'),('05678','0003')]
rdd4 = spark.sparkContext.parallelize(l4) #On obtient un rdd
Write = rdd4.toDF(['aid','bid'])
Write.createOrReplaceTempView('WriteSQL')# transformation du data frame en table sql !!!!
Write.show()
Write.dtypes # retourne nom des colonnes et leurs types

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0003|
+-----+----+



[('aid', 'string'), ('bid', 'string')]

In [136]:
# création de la base Borrow  
l5 = [('S15','0003','02-01-2020','01-02-2020'),('S15','0002','13-06-2020',None),('S15','0001','13-06-2020','13-10-2020'),('S16','0002','24-01-2020','24-01-2020'),('S17','0001','12-04-2020','01-07-2020')]
rdd5 = spark.sparkContext.parallelize(l5) #On obtient un rdd
Borrow = rdd5.toDF(['sid','bid','checkout_time','return_time'])
Borrow.createOrReplaceTempView('BorrowSQL')# transformation du data frame en table sql !!!!
Borrow.show()
Borrow.dtypes # retourne nom des colonnes et leurs types

+---+----+-------------+-----------+
|sid| bid|checkout_time|return_time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



[('sid', 'string'),
 ('bid', 'string'),
 ('checkout_time', 'string'),
 ('return_time', 'string')]

In [86]:
### DSL donc en spark
Borrow.join(Book ,['bid']) \
    .select('sid','title') \
    .filter(F.col("sid") == 'S15') \
    .show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



In [87]:
### SQL solution 1:
spark.sql("""select sid, title
             from BorrowSQL as t1 
             join BookSQL as t2 
             on t1.bid=t2.bid 
             where t1.sid='S15' """).show()

### SQL solution 2:
spark.sql("""select sid, title
             from BorrowSQL as t1,
             BookSQL as t2 
             where t1.bid=t2.bid and t1.sid='S15' """).show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



#### Question 2: les titres de tous les livres qui n'ont jamais été empruntés par un étudiant.

In [88]:
### DSL

Book.join(Borrow ,['bid'],how='left_outer') \
    .filter(F.col("sid").isNull())\
    .select('bid', 'title')\
    .show()


+----+----------+
| bid|     title|
+----+----------+
|0004|Les Amours|
+----+----------+



In [89]:
# SQL

spark.sql("""select t1.bid, t1.title
             from BookSQL as t1 
             left outer join BorrowSQL as t2 
             on t1.bid==t2.bid
             where sid is null""").show()

+----+----------+
| bid|     title|
+----+----------+
|0004|Les Amours|
+----+----------+



#### Question 3: les étudiants qui ont emprunté le livre bid=’0002’

In [90]:
### DSL

Student.join(Borrow ,['sid'],how='inner') \
    .filter(F.col("bid")=='0002')\
    .select('sid','sname','bid')\
    .show()


+---+-----+----+
|sid|sname| bid|
+---+-----+----+
|S16| popo|0002|
|S15| toto|0002|
+---+-----+----+



In [91]:
### SQL

spark.sql("""select t1.sid,t1.sname,t2.bid
             from StudentSQL as t1 
             join BorrowSQL as t2 
             on t1.sid==t2.sid
             where t2.bid=='0002' """).show()

+---+-----+----+
|sid|sname| bid|
+---+-----+----+
|S16| popo|0002|
|S15| toto|0002|
+---+-----+----+



#### Question 4: les titres de tous les livres empruntés par des étudiants en Mécanique (département Mécanique)

In [92]:
# DSL
Book.join(Borrow, 'bid')\
    .join(Student, 'sid')\
    .select(Book.bid,'title',Student.dept)\
    .filter(F.col('dept')=='Mécanique')\
    .show()

+----+--------------------+---------+
| bid|               title|     dept|
+----+--------------------+---------+
|0001|L'existentialisme...|Mécanique|
+----+--------------------+---------+



In [96]:
# SQL
# Solution 1:
spark.sql("""select t1.bid,t1.title,t3.dept
            from bookSQL as t1
            join borrowSQL as t2
            on t1.bid = t2.bid
            join StudentSQL as t3
            on t2.sid = t3.sid
            where t3.dept = 'Mécanique' """).show()

# Solution 2: Mais très lent en terme de ressources
spark.sql("""select t1.bid, t1.title, t3.dept
            from bookSQL as t1,
            borrowSQL as t2,
            StudentSQL as t3
            where t1.bid = t2.bid and
            t2.sid = t3.sid and
            t3.dept = 'Mécanique' """).show()

+----+--------------------+---------+
| bid|               title|     dept|
+----+--------------------+---------+
|0001|L'existentialisme...|Mécanique|
+----+--------------------+---------+

+----+--------------------+---------+
| bid|               title|     dept|
+----+--------------------+---------+
|0001|L'existentialisme...|Mécanique|
+----+--------------------+---------+



#### Question 5: les étudiants qui n’ont jamais empruntés de livre

In [97]:
# DSL
Student.join(Borrow, Student.sid==Borrow.sid, how='left')\
    .filter(F.col('sname').isNull())\
    .select(Student.sid,'sname')\
    .show()

+---+-----+
|sid|sname|
+---+-----+
+---+-----+



In [98]:
# SQL
spark.sql("""select t1.sid, t1.sname
            from StudentSQL as t1
            left join borrowSQL as t2
            on t1.sid = t2.sid
            where t2.sid is null""").show()

+---+-----+
|sid|sname|
+---+-----+
+---+-----+



#### Question 6: Déterminer l’auteur qui a écrit le plus de livres

In [99]:
# DSL
Author.join(Write, "aid") \
    .distinct() \
    .groupBy("aid","name") \
    .agg(F.count("bid").alias("nombre")) \
    .sort(F.col("nombre").desc()) \
    .select(F.first("aid").alias("aid"),F.first("name").alias("name"),F.first("nombre").alias("nombre")) \
    .show()

+-----+----------------+------+
|  aid|            name|nombre|
+-----+----------------+------+
|07890|Jean Paul Sartre|     2|
+-----+----------------+------+



In [138]:
# SQL 
spark.sql("""select t2.aid, t2.name, count(distinct bid) as nombre
            from writeSQL as t1
            join AuthorSQL as t2
            on t1.aid = t2.aid
            group by t2.aid, t2.name
            order by nombre desc
            LIMIT 1""").show()

+-----+----------------+------+
|  aid|            name|nombre|
+-----+----------------+------+
|07890|Jean Paul Sartre|     2|
+-----+----------------+------+



##### Question 7:  les personnes qui n’ont pas encore rendu les livres

In [106]:
# SQL
spark.sql("""select t1.sid ,t1.sname
            from StudentSQL as t1
            join borrowSQL as t2
            on t1.sid = t2.sid
            where t2.return_time is null """).show()

+---+-----+
|sid|sname|
+---+-----+
|S15| toto|
+---+-----+



In [107]:
# DSL
Student.join(Borrow, 'sid')\
    .select('sid','sname')\
    .filter(F.col('return_time').isNull())\
    .show()

+---+-----+
|sid|sname|
+---+-----+
|S15| toto|
+---+-----+



#### Question 8: Création une nouvelle colonne dans la table Borrow qui prend la valeur 1, si la durée d'emprunt est supérieure à 3 mois,  sinon 0.  (utiliser la fonction spark to_date) par la suite faire un export des données en CSV

In [146]:
df=Borrow.withColumn("check_to_date", F.to_date(F.col("checkout_time"), "dd-MM-yyyy"))\
    .withColumn("ret_to_date", F.to_date(F.col("return_time"), "dd-MM-yyyy"))\
    .withColumn("Duree", F.datediff(F.col("ret_to_date"), F.col("check_to_date")))\
    .withColumn("3mois+", (F.when(F.col("Duree")>=90, 1).otherwise(0)))\
    .select("sid", "bid", "checkout_time", "return_time", "3mois+")

In [152]:
df.toPandas().to_csv("../Data/data_file.csv", header=True)

#### Question 9: Déterminer les livres qui n’ont jamais été empruntés

In [122]:
# SQL
spark.sql("""select t1.bid, t1.title
            from bookSQL as t1
            left join borrowSQL as t2
            on t1.bid = t2.bid
            where t2.sid is null""").show()

+----+----------+
| bid|     title|
+----+----------+
|0004|Les Amours|
+----+----------+



In [124]:
# DSL
Book.join(Borrow, Book.bid==Borrow.bid, how='left')\
    .select(Book.bid, 'title')\
    .filter(F.col('sid').isNull())\
    .show()

+----+----------+
| bid|     title|
+----+----------+
|0004|Les Amours|
+----+----------+



In [128]:
# Stopping SparkSession
spark.stop()