In [1]:
## Import 

import findspark
import py4j
import numpy as np
import pandas as pd
findspark.init("D:\logiciels\spark")


from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, when
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import datediff, to_date


In [2]:
spark = SparkSession.builder.appName('Big_data').getOrCreate()
sc = spark.sparkContext # Lancement de la session Spark

In [3]:
Author = sc.parallelize([["07890", "Jean Paul Sartre"], ["05678", "Pierre de Ronsard"]]).toDF(("aid", "name"))
Author.show()
Author.createOrReplaceTempView("Author_sql")

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



In [4]:
book = sc.parallelize([["0001", "L'existentialisme est un humanisme", "Philosophie"], ["0002", "Huis clos. Suivi de Les Mouches", "Philosophie"], ["0003", "Mignonne allons voir si la rose", "Poeme"], ["0004", "Les Amours", "Poème"]]).toDF(("bid", "title", "category"))
book.show()
book.createOrReplaceTempView("book_sql")

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poeme|
|0004|          Les Amours|      Poème|
+----+--------------------+-----------+



In [5]:
Student = sc.parallelize([["S15", "toto", "Math"], ["S16", "popo", "Eco"], ["S17", "fofo", "Mécanique"]]).toDF(("sid", "sname", "dept"))
Student.show()
Student.createOrReplaceTempView("Student_sql")

+---+-----+---------+
|sid|sname|     dept|
+---+-----+---------+
|S15| toto|     Math|
|S16| popo|      Eco|
|S17| fofo|Mécanique|
+---+-----+---------+



In [6]:
write = sc.parallelize([["07890", "0001"], ["07890", "0002"], ["05678", "0003"], ["05678", "0003"]]).toDF(("aid", "bid"))
write.show()
write.createOrReplaceTempView("write_sql")

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0003|
+-----+----+



In [7]:
borrow = sc.parallelize([["S15", "0003", "02-01-2020","01-02-2020"], ["S15", "0002", "13-06-2020",None], ["S15", "0001", "13-06-2020","13-10-2020"], ["S16", "0002", "24-01-2020","24-01-2020"],["S17", "0001", "12-04-2020","01-07-2020"]]).toDF(("sid", "bid", "checkout_time", "return_time")) 
borrow.show() 
borrow.createOrReplaceTempView("borrow_sql")

+---+----+-------------+-----------+
|sid| bid|checkout_time|return_time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



# 1-Trouver les titres de tous les livres que l'étudiant sid='S15' a emprunté.

## DSL

In [8]:
borrow.join(book, "bid")\
    .select("title")\
    .filter(col("sid") == "S15")\
    .show()

+--------------------+
|               title|
+--------------------+
|Huis clos. Suivi ...|
|Mignonne allons v...|
|L'existentialisme...|
+--------------------+



## SQL

In [9]:
spark.sql("""SELECT book_sql.title FROM borrow_sql 
    LEFT JOIN book_sql 
    ON borrow_sql.bid = book_sql.bid 
    WHERE borrow_sql.sid = 'S15' """).show()

+--------------------+
|               title|
+--------------------+
|Huis clos. Suivi ...|
|Mignonne allons v...|
|L'existentialisme...|
+--------------------+



# 2-Trouver les titres de tous les livres qui n'ont jamais été empruntés par un étudiant.

## DSL

In [10]:
borrow.join(book, "bid",how="right")\
        .select("title")\
        .filter(col("checkout_time").isNull())\
        .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



## SQL

In [11]:
spark.sql("""SELECT book_sql.title FROM book_sql 
    LEFT JOIN borrow_sql 
    ON borrow_sql.bid = book_sql.bid 
    WHERE borrow_sql.bid IS NULL """).show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



## 3-Trouver tous les étudiants qui ont emprunté le livre bid=’002’.

# DSL

In [12]:
Student.join(borrow, "sid")\
    .select("sname")\
    .filter(col("bid") == "0002")\
    .show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



## SQL

In [13]:
spark.sql("""SELECT sname FROM Student_sql 
    LEFT JOIN borrow_sql 
    ON Student_sql.sid = borrow_sql.sid 
    WHERE borrow_sql.bid = '0002' """).show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



# 4-Trouver les titres de tous les livres empruntés par des étudiants en mécanique.

## DSL

In [14]:
Student.join(borrow, "sid")\
    .join(book, "bid")\
    .select("sid", "dept", "title")\
    .filter(col("dept") == "Mécanique")\
    .show()

+---+---------+--------------------+
|sid|     dept|               title|
+---+---------+--------------------+
|S17|Mécanique|L'existentialisme...|
+---+---------+--------------------+



## SQL

In [15]:
spark.sql(""" SELECT Student_sql.sid, dept, title FROM Student_sql 
    LEFT JOIN borrow_sql 
    ON borrow_sql.sid = Student_sql.sid
    LEFT JOIN book_sql 
    ON borrow_sql.bid = book_sql.bid 
    WHERE Student_sql.dept = 'Mécanique' """).show()

+---+---------+--------------------+
|sid|     dept|               title|
+---+---------+--------------------+
|S17|Mécanique|L'existentialisme...|
+---+---------+--------------------+



# 5-Trouver les étudiants qui n’ont jamais emprunté de livre.

## DSL

In [16]:
Student.join(borrow, "sid")\
    .select("sname")\
    .filter(col("checkout_time").isNull())\
    .show()

+-----+
|sname|
+-----+
+-----+



## SQL

In [17]:
spark.sql("""SELECT sname FROM Student_sql 
    LEFT JOIN borrow_sql 
    ON Student_sql.sid = borrow_sql.sid 
    WHERE borrow_sql.checkout_time IS NULL""").show()

+-----+
|sname|
+-----+
+-----+



# 6- Déterminer l’auteur qui a écrit le plus de livres.

## DSL

In [18]:
Author.join(write, "aid", how="left")\
        .distinct()\
        .groupBy("name")\
        .agg(F.count(col("name")))\
        .sort("name",ascending = True)\
        .limit(1)\
        .show()

+----------------+-----------+
|            name|count(name)|
+----------------+-----------+
|Jean Paul Sartre|          2|
+----------------+-----------+



## SQL

In [19]:
spark.sql(""" SELECT name, COUNT(DISTINCT write_sql.bid) as nb FROM Author_sql
    LEFT JOIN write_sql 
    ON Author_sql.aid = write_sql.aid 
    GROUP BY name
    ORDER BY nb desc LIMIT 1 """).show()

+----------------+---+
|            name| nb|
+----------------+---+
|Jean Paul Sartre|  2|
+----------------+---+



# 7- Déterminer les personnes qui n’ont pas encore rendu les livres.


## DSL

In [27]:
Student.join(borrow, "sid", how ="left")\
        .select("sname")\
        .filter(col("return_time").isNull())\
        .show()

+-----+
|sname|
+-----+
| toto|
+-----+



## SQL

In [21]:
spark.sql(""" SELECT sname FROM Student_sql WHERE sid IN
    (SELECT sid FROM borrow_sql WHERE return_time IS NULL) """).show()

+-----+
|sname|
+-----+
| toto|
+-----+



# 8- Créer une nouvelle colonne dans la table borrow qui prend la valeur 1, si la durée d'emprunt est supérieur à 3 mois,  sinon 0.  (utiliser la fonction spark to_date) par la suite faire un export des données en CSV. dans un répertoire nommé contention (Attention pas de path en dur ! )

In [22]:
borrow = borrow.withColumn("Sup_3_mois",
                 datediff(to_date("return_time","dd-MM-yyyy"),
                         to_date("checkout_time","dd-MM-yyyy")))\
        

In [124]:
borrow.show()

+---+----+-------------+-----------+----------+
|sid| bid|checkout_time|return_time|Sup_3_mois|
+---+----+-------------+-----------+----------+
|S15|0003|   02-01-2020| 01-02-2020|        30|
|S15|0002|   13-06-2020|       null|      null|
|S15|0001|   13-06-2020| 13-10-2020|       122|
|S16|0002|   24-01-2020| 24-01-2020|         0|
|S17|0001|   12-04-2020| 01-07-2020|        80|
+---+----+-------------+-----------+----------+



In [132]:
Superbase = borrow.withColumn("Sup_3_mois",when(col("Sup_3_mois")>=90,1).when(col("Sup_3_mois")<90,0).otherwise(None))

In [135]:
Superbase.toPandas().to_csv("borrow.csv")


# 9-déterminer les livres qui n’ont jamais été empruntés. 

##  cf voir question 2