<a href="https://colab.research.google.com/github/anismenaa/chainage/blob/main/Colab_TP3_SPARK_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installation de spark-cluster & findspark, pyspark 

In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz

# unzip the spark file to the current folder
!tar xf spark-3.2.0-bin-hadoop2.7.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"


# install findspark using pip
!pip install -q findspark

# install pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=6f1289fb5c51113c12e099191e992c7a161244ce2af734c60b7764a71c3d01c5
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


# importation et initialisation de Spark

In [2]:
import findspark 
findspark.init()

## importer PySpark et initialiser SparkContext
![image.png](attachment:image.png)

In [3]:
from pyspark.sql import SparkSession
session =SparkSession.builder.master("local").appName("FirstApp").getOrCreate() 


### Lecture des données

In [4]:
books_df = session.read.csv('books.csv', header=True, inferSchema=True) 

### Afficher des Informations sur le  schéma

In [5]:
books_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- best_book_id: integer (nullable = true)
 |-- work_id: integer (nullable = true)
 |-- books_count: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: double (nullable = true)
 |-- authors: string (nullable = true)
 |-- original_publication_year: double (nullable = true)
 |-- original_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- work_ratings_count: string (nullable = true)
 |-- work_text_reviews_count: string (nullable = true)
 |-- ratings_1: double (nullable = true)
 |-- ratings_2: integer (nullable = true)
 |-- ratings_3: integer (nullable = true)
 |-- ratings_4: integer (nullable = true)
 |-- ratings_5: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- small_image_url: string (nullable = true)


In [6]:
type(books_df)

pyspark.sql.dataframe.DataFrame

In [7]:
len(books_df.columns)

23

### Le nombre d'éléments

In [8]:
books_df.count()

10000

### Le DataFrame rating_df

In [9]:
ratings_df = session.read.csv('ratings.csv', header=True, inferSchema=True) 

In [10]:
ratings_df.count()

388533

In [11]:
ratings_df.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



### La fonction show()  & head()

In [12]:
ratings_df.show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



In [13]:
ratings_df.head(5)

[Row(book_id=1, user_id=314, rating=5),
 Row(book_id=1, user_id=439, rating=3),
 Row(book_id=1, user_id=588, rating=5),
 Row(book_id=1, user_id=1169, rating=4),
 Row(book_id=1, user_id=1185, rating=4)]

### Créer un dataFrame From RDD

In [16]:
from pyspark.sql import Row

sc=session.sparkContext

# Chargement du fichier texte et conversion de chaque ligne en Row.
lines = sc.textFile("HousePrice.csv")
lines = lines.filter(lambda line: "city" not in line  or "price" not in line)
parts = lines.map(lambda l: l.split(","))
house = parts.map(lambda p: Row(ville=p[15], prix=float(p[1])))

# Déduire le schéma et enregistrer le DataFrame comme une table.
schemaHouse = session.createDataFrame(house)
schemaHouse.show(10)

+------------+---------+
|       ville|     prix|
+------------+---------+
|   Shoreline| 313000.0|
|     Seattle|2384000.0|
|        Kent| 342000.0|
|    Bellevue| 420000.0|
|     Redmond| 550000.0|
|     Seattle| 490000.0|
|     Redmond| 335000.0|
|Maple Valley| 482000.0|
|  North Bend| 452500.0|
|     Seattle| 640000.0|
+------------+---------+
only showing top 10 rows



### Afficher des statistiques sur les données 

In [17]:
ratings_df.describe('rating').show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            388532|
|   mean| 3.839624535430801|
| stddev|0.9981900693359621|
|    min|                 1|
|    max|                 5|
+-------+------------------+



### La projection sur un dataFrame avec select()

In [18]:
ratings_df.select("book_id","rating").show(5)

+-------+------+
|book_id|rating|
+-------+------+
|      1|     5|
|      1|     3|
|      1|     5|
|      1|     4|
|      1|     4|
+-------+------+
only showing top 5 rows



### Filter les données d'un dataFrame

In [19]:
ratings_df.filter("rating <= 3").show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    439|     3|
|      1|   5461|     3|
|      1|   7563|     3|
|      1|   9246|     1|
|      1|  20076|     3|
+-------+-------+------+
only showing top 5 rows



In [20]:
ratings_df.filter("rating <= 3 and book_id<10").show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    439|     3|
|      1|   5461|     3|
|      1|   7563|     3|
|      1|   9246|     1|
|      1|  20076|     3|
+-------+-------+------+
only showing top 5 rows



In [21]:
ratings_df.select("book_id","rating").filter("rating <= 3").show(5)

+-------+------+
|book_id|rating|
+-------+------+
|      1|     3|
|      1|     3|
|      1|     3|
|      1|     1|
|      1|     3|
+-------+------+
only showing top 5 rows



### Distinct()

In [22]:
ratings_df.select("user_id").distinct().show(5)


+-------+
|user_id|
+-------+
|  32592|
|  19984|
|  35982|
|   1088|
|   3918|
+-------+
only showing top 5 rows



### Le groupement d'un dataFrame

In [23]:
ratings_df.groupby("rating").count().show()

+------+------+
|rating| count|
+------+------+
|     1|  8642|
|     3| 99425|
|     5|115332|
|     4|138987|
|     2| 26146|
|  null|     1|
+------+------+



In [24]:
ratings_df.groupby('book_id').avg('rating').show(5)

+-------+-----------+
|book_id|avg(rating)|
+-------+-----------+
|    148|       3.57|
|    463|       3.99|
|    471|       3.84|
|    496|       3.79|
|    833|       3.44|
+-------+-----------+
only showing top 5 rows



### Ordonner le résultats

In [None]:
ratings_df.orderBy("rating").show(5)

In [None]:
ratings_df.orderBy(ratings_df.rating.desc()).show(5)

### Jointure entre les dataFrames

In [25]:
ratings_df.join(books_df, books_df.book_id == ratings_df.book_id)\
          .select("user_id","title","rating").show(5)

+-------+--------------------+------+
|user_id|               title|rating|
+-------+--------------------+------+
|    314|Harry Potter and ...|     5|
|    439|Harry Potter and ...|     3|
|    588|Harry Potter and ...|     5|
|   1169|Harry Potter and ...|     4|
|   1185|Harry Potter and ...|     4|
+-------+--------------------+------+
only showing top 5 rows



### SQL natives avec createTempView()

In [26]:
ratings_df.createTempView("rating_table")
session.sql("SELECT * FROM rating_table ORDER BY book_id DESC").show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|   3891|   1376|     2|
|   3891|   3891|     3|
|   3891|   1551|     4|
|   3891|   1886|     4|
|   3891|   3644|     3|
+-------+-------+------+
only showing top 5 rows



### Gestion des colonnes avec withColumn

In [27]:
ratings_df.withColumn("rating", ratings_df.rating*10).show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|    50|
|      1|    439|    30|
|      1|    588|    50|
|      1|   1169|    40|
|      1|   1185|    40|
+-------+-------+------+
only showing top 5 rows



In [28]:
new_dataset = ratings_df.withColumn("rating_ten", ratings_df.rating*10)
new_dataset.show(5)

+-------+-------+------+----------+
|book_id|user_id|rating|rating_ten|
+-------+-------+------+----------+
|      1|    314|     5|        50|
|      1|    439|     3|        30|
|      1|    588|     5|        50|
|      1|   1169|     4|        40|
|      1|   1185|     4|        40|
+-------+-------+------+----------+
only showing top 5 rows



### Supprimer une Colonne

In [29]:
ratings_df.drop('rating').show(5)

+-------+-------+
|book_id|user_id|
+-------+-------+
|      1|    314|
|      1|    439|
|      1|    588|
|      1|   1169|
|      1|   1185|
+-------+-------+
only showing top 5 rows



In [30]:
ratings_df.show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



### Gestion des doublants et les valeurs nulls

In [31]:
ratings_df.count()

388533

In [32]:
ratings_df.dropDuplicates().count()

387967

In [33]:
ratings_df.dropna('any').count() # drop a row if it contains any nulls

388532

In [34]:
ratings_df.dropna('all').count() # drop a row if it contains any nulls

388533

### Conversion

In [35]:
ratings_df.toPandas()

Unnamed: 0,book_id,user_id,rating
0,1,314,5.0
1,1,439,3.0
2,1,588,5.0
3,1,1169,4.0
4,1,1185,4.0
...,...,...,...
388528,3891,19618,3.0
388529,3891,20512,4.0
388530,3891,20931,3.0
388531,3891,21946,2.0


In [36]:
rdd_convert = ratings_df.rdd
rdd_convert.collect()

[Row(book_id=1, user_id=314, rating=5),
 Row(book_id=1, user_id=439, rating=3),
 Row(book_id=1, user_id=588, rating=5),
 Row(book_id=1, user_id=1169, rating=4),
 Row(book_id=1, user_id=1185, rating=4),
 Row(book_id=1, user_id=2077, rating=4),
 Row(book_id=1, user_id=2487, rating=4),
 Row(book_id=1, user_id=2900, rating=5),
 Row(book_id=1, user_id=3662, rating=4),
 Row(book_id=1, user_id=3922, rating=5),
 Row(book_id=1, user_id=5379, rating=5),
 Row(book_id=1, user_id=5461, rating=3),
 Row(book_id=1, user_id=5885, rating=5),
 Row(book_id=1, user_id=6630, rating=5),
 Row(book_id=1, user_id=7563, rating=3),
 Row(book_id=1, user_id=9246, rating=1),
 Row(book_id=1, user_id=10140, rating=4),
 Row(book_id=1, user_id=10146, rating=5),
 Row(book_id=1, user_id=10246, rating=4),
 Row(book_id=1, user_id=10335, rating=4),
 Row(book_id=1, user_id=10610, rating=5),
 Row(book_id=1, user_id=10944, rating=5),
 Row(book_id=1, user_id=11854, rating=4),
 Row(book_id=1, user_id=11927, rating=4),
 Row(book_i