In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark import SparkContext #SparkToDate

In [4]:
spark = SparkSession.builder\
                    .master("local")\
                    .appName("final")\
                    .getOrCreate()


In [5]:
T1=[("07890","Jean Paul Sartre"),("05678","Pierre de Ronsard")]
Rdd1=spark.sparkContext.parallelize(T1)
Rdd1.collect()
Author=Rdd1.toDF(["aid","name"])
Author.show()
Author.createOrReplaceTempView("Author_sql")

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



In [6]:
T2 = [("0001", "L'existentialisme est un humanisme", "Philosophie"), ("0002", "Huis clos. Suivi de Les Mouches", "Philosophie"), ("0003", "Mignonne allons voir si la rose", "Poeme"), ("0004", "Les Amours", "Poeme")]
RDD2 = spark.sparkContext.parallelize(T2)
book = RDD2.toDF(["bid", "title", "category"])
book.show()
book.createOrReplaceTempView("book_SQL")

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poeme|
|0004|          Les Amours|      Poeme|
+----+--------------------+-----------+



In [7]:
T3 = [("S15", "toto", "math"), ("S16", "popo", "eco"), ("S17", "fofo", "mecanique")]
RDD3 = spark.sparkContext.parallelize(T3)
student = RDD3.toDF(["sid", "sname", "category"])
student.show()
student.createOrReplaceTempView("student_sql")

+---+-----+---------+
|sid|sname| category|
+---+-----+---------+
|S15| toto|     math|
|S16| popo|      eco|
|S17| fofo|mecanique|
+---+-----+---------+



In [8]:
T4 = [("07890", "0001"), ("07890", "0002"), ("05678", "0003"), ("05678", "0003") ]
RDD4 = spark.sparkContext.parallelize(T4)
write = RDD4.toDF(["aid", "bid"])
write.show()
write.createOrReplaceTempView("write_SQL")

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0003|
+-----+----+



In [10]:
T5 = [("S15", "0003", "02-01-2020","01-02-2020"), 
      ("S15", "0002", "13-06-2020","null"), 
      ("S15", "0001", "13-06-2020","13-10-2020"),
      ("S16", "0002", "24-01-2020","24-01-2020"),
      ("S17", "0001", "12-04-2020","01-07-2020")]
RDD5 = spark.sparkContext.parallelize(T5)
borrow = RDD5.toDF(["sid", "bid", "checkout_time","return_time"])
borrow.show()
borrow.createOrReplaceTempView("borrow_sql")

+---+----+-------------+-----------+
|sid| bid|checkout_time|return_time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



##  1 Livres empruntés par S15

METHODE DSL

In [11]:
student.join(borrow,["sid"]).filter(col("sid")=="S15").join(book,["bid"]).select(col('sid'),col('title')).show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



METHODE SQL

In [12]:
spark.sql(""" select sid,title from borrow_sql
              left join book_sql on borrow_sql.bid=book_sql.bid
          where sid="S15"  """).show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



## 2 Les livres jamais empruntés

In [13]:

book.join(borrow,["bid"],how='full').filter(col('sid').isNull()).select(col('title')).show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



In [14]:
spark.sql(""" select title,sid from borrow_sql
              full join book_sql on borrow_sql.bid=book_sql.bid
              where sid is null """).show()

+----------+----+
|     title| sid|
+----------+----+
|Les Amours|null|
+----------+----+



## 3 Etudiants qui ont emprunté le livre bid=0002

In [15]:

student.join(borrow,["sid"]).filter(col("bid")=="0002").select(col('sname'),col('sid'),col('bid')).show()

+-----+---+----+
|sname|sid| bid|
+-----+---+----+
| popo|S16|0002|
| toto|S15|0002|
+-----+---+----+



In [16]:
spark.sql(""" select bid,sname from student_sql
              left join borrow_sql on borrow_sql.sid=student_sql.sid
              where bid="0002" """).show()

+----+-----+
| bid|sname|
+----+-----+
|0002| popo|
|0002| toto|
+----+-----+



## 4 Livres empruntés par des étudiants en mecanique

In [17]:

student.join(borrow,["sid"]).filter(col("category")=="mecanique").join(book,["bid"]).select(col('sid'),col('title')).show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S17|L'existentialisme...|
+---+--------------------+



In [None]:
spark.sql(""" select category,title from student_sql
              left join borrow_sql on borrow_sql.sid=student_sql.sid
              where category="mecanique" """).show()

## 5 Etudiants n'ayant jamais emprunté de livre

In [20]:

student.join(borrow,["sid"],how='full').filter(col('sid').isNull()).show()

+---+-----+--------+---+-------------+-----------+
|sid|sname|category|bid|checkout_time|return_time|
+---+-----+--------+---+-------------+-----------+
+---+-----+--------+---+-------------+-----------+



In [21]:
spark.sql(""" select * from borrow_sql
              full join student_sql on borrow_sql.sid=student_sql.sid
              where sname is null """).show()

+---+---+-------------+-----------+---+-----+--------+
|sid|bid|checkout_time|return_time|sid|sname|category|
+---+---+-------------+-----------+---+-----+--------+
+---+---+-------------+-----------+---+-----+--------+



## 6 Auteur avec plus de livres écrits


In [22]:
Author.join(write,['aid']).groupBy(col('name')).agg(F.count(col('name'))).show()

+-----------------+-----------+
|             name|count(name)|
+-----------------+-----------+
| Jean Paul Sartre|          2|
|Pierre de Ronsard|          2|
+-----------------+-----------+



In [23]:
spark.sql(""" select name,count(*) as cnt from write_SQL
              left join Author_sql on write_SQL.aid=Author_sql.aid
              group by name""").show()

+-----------------+---+
|             name|cnt|
+-----------------+---+
| Jean Paul Sartre|  2|
|Pierre de Ronsard|  2|
+-----------------+---+



## 7 Etudiant qui n'a pas encore rendu de livre

In [24]:

borrow.join(student,['sid']).filter(col('return_time')=="null").show()

+---+----+-------------+-----------+-----+--------+
|sid| bid|checkout_time|return_time|sname|category|
+---+----+-------------+-----------+-----+--------+
|S15|0002|   13-06-2020|       null| toto|    math|
+---+----+-------------+-----------+-----+--------+



In [25]:
spark.sql(""" select sname from borrow_sql
              left join student_sql on borrow_sql.sid=student_sql.sid
              where return_time ="null" """).show()

+-----+
|sname|
+-----+
| toto|
+-----+



## 8 Nouvelle colonne dans la table borrow et export des données

In [26]:
borrow.withColumn("check_to_date", F.to_date(F.col("checkout_time"), "dd-MM-yyyy"))\
    .withColumn("ret_to_date", F.to_date(F.col("return_time"), "dd-MM-yyyy"))\
    .withColumn("Duree", F.datediff(F.col("ret_to_date"), F.col("check_to_date")))\
    .withColumn("3mois+", (F.when(F.col("Duree")>=90, 1).otherwise(0)))\
    .toPandas().to_csv('retour.csv')