# Project NoSQL: Apache Spark
Made by Stanislas KIESGEN DE RICHTER and Gabriel PRECIGOUT.



## Part 1: Introduction


## Part 2: Installation

### 2.1. Install Java 8 or check its version

Before installing Apache Spark we need to check if Java and JDK 8 are installed using the command prompt.

Let's open it and check our Java versions:
```
C:\Users\gabriel>java -version
openjdk version "1.8.0_272"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_272-b10)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.272-b10, mixed mode)
```

### 2.2. Install Python or check its version

We also need to install python before we can use Apache Spark, so let's check our version:
```
C:\Users\gabriel>python
Python 3.8.6 (tags/v3.8.6:db45529, Sep 23 2020, 15:52:53) [MSC v.1927 64 bit (AMD64)] on win32
```

### 2.3. Downloading Apache Spark

We need to go the following website: https://spark.apache.org/downloads.html.
Once we dowloaded it, we need to create a folder wherever you want that we'll call Spark and extract the compressed file in there.
When the Spark File is full, we should open the conf folder and rename the file log4j.properties.template to log4j.properties and open it with wordpad for example.

Inside the log4j.properties, we'll find the following line:
```
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
```
We will change it to:
```
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
```
This change will remove all the logs that gets prints off when we'll run commands with Spark.
Save the file and close it.

### 2.4. winutils

If you're not on windows you can skip this step, otherwise it's recommended.
You need to download winutils, you can find it here (https://github.com/steveloughran/winutils)
This is a windows binary, Hadoop requires native libraries on Windows to work properly -that includes accessing the file:// filesystem, where Hadoop uses some Windows APIs to implement posix-like file access permissions.

This is implemented in hadoop.dll and winutils.exe.

Now we need to create a winutil folder at the root of our Spark folder and we can call this new folder "winutils", inside it we'll create another folder called "bin" and copy our winutils.exe here.

Open the command prompt (CMD) and type the following commands:
```
C:\Users\gabki>cd C:\Program Files (x86)\winutils\bin

C:\Program Files (x86)\winutils\bin>dir
 Le volume dans le lecteur C s’appelle Windows-SSD
 Le numéro de série du volume est D801-AF53

 Répertoire de C:\Program Files (x86)\winutils\bin

14/12/2020  22:40    <DIR>          .
14/12/2020  22:40    <DIR>          ..
14/12/2020  22:27           108 032 winutils.exe
               1 fichier(s)          108 032 octets
               2 Rép(s)  16 753 942 528 octets libres
C:\Program Files (x86)\winutils\bin>mkdir \tmp\hive
C:\Program Files (x86)\winutils\bin>winutils.exe ls \tmp\hive
d--------- 1 DESKTOP-2KDT29C\gabki DESKTOP-2KDT29C\gabki 0 Dec 14 2020 \tmp\hive
C:\Program Files (x86)\winutils\bin>dir \tmp\hive
 Le volume dans le lecteur C s’appelle Windows-SSD
 Le numéro de série du volume est D801-AF53

 Répertoire de C:\tmp\hive

14/12/2020  22:57    <DIR>          .
14/12/2020  22:57    <DIR>          ..
               0 fichier(s)                0 octets
               2 Rép(s)  17 155 751 936 octets libres
C:\Program Files (x86)\winutils\bin>set path=%path%;"C:\Program Files (x86)\winutils\bin"
C:\Program Files (x86)\winutils\bin>winutils.exe ls \tmp\hive
drwxrwxrwx 1 DESKTOP-2KDT29C\gabki DESKTOP-2KDT29C\gabki 0 Dec 14 2020 \tmp\hive
```

### 2.5. Definition of the environment variables

We need to define the environment variables and we'll be ready to use Spark!

Let's define the SPARK_HOME variable with the path to our Spark folder;
JAVA_HOME with the path to the java JDK
HADOOP_HOME with the path to the winutils

### 2.6. Check your installation

Now that everything is setup you can test spark to see if it runs!
```
C:\Users\gabriel>cd c:\Spark\bin
c:\Spark\bin>pyspark
Python 3.8.6 (tags/v3.8.6:db45529, Sep 23 2020, 15:52:53) [MSC v.1927 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Python version 3.8.6 (tags/v3.8.6:db45529, Sep 23 2020 15:52:53)
SparkSession available as 'spark'.
```
We can also test the following programs:

In [1]:
import findspark

findspark.init()

In [2]:
import pyspark

#from pyspark.sql import SparkSession

#spark = SparkSession.builder.getOrCreate()

#df = spark.sql("select 'spark' as hello ")

#df.show()

## Part 3: Query Requests

First we need to initialize our SparkContext:

In [3]:
from pyspark import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

### 3.1. Creating a Dataframe

Let's create a dataframe

In [4]:
from pyspark.sql.types import *
from array import array
data = [(1, "Gabriel", "Paris"),(2, "Stanislas", "Montpelier")]
#dataSchema = StructType(array(
#    StructField("studentID", IntegerType(), False),
#    StructField("studentName", StringType(), True),
#    StructField("city", StringType(), True)))
rdd = sc.parallelize(data)
students = rdd.map(lambda x: Row(studentID=int(x[0]), studentName=x[1], city=x[2]))
table = sqlContext.createDataFrame(students) #, dataSchema
table.show()

+---------+-----------+----------+
|studentID|studentName|      city|
+---------+-----------+----------+
|        1|    Gabriel|     Paris|
|        2|  Stanislas|Montpelier|
+---------+-----------+----------+



### 3.2. Show the datatype
We can see the datatypes of each column with the following command:

In [5]:
table.printSchema()

root
 |-- studentID: long (nullable = true)
 |-- studentName: string (nullable = true)
 |-- city: string (nullable = true)



### 3.3. Add a column
You can add a column with the function withColumns which edit an existing column or creates it if it doesn't exists:

In [6]:
from pyspark.sql.functions import lit
table = table.withColumn("pocketMoney", lit(50))
table.show()

+---------+-----------+----------+-----------+
|studentID|studentName|      city|pocketMoney|
+---------+-----------+----------+-----------+
|        1|    Gabriel|     Paris|         50|
|        2|  Stanislas|Montpelier|         50|
+---------+-----------+----------+-----------+



### 3.4. Drop a column
You can drop a column from the dataframe using the drop operation, we will drop our newly created column 'pocketMoney'.

In [7]:
table = table.drop("pocketMoney")
table.show()

+---------+-----------+----------+
|studentID|studentName|      city|
+---------+-----------+----------+
|        1|    Gabriel|     Paris|
|        2|  Stanislas|Montpelier|
+---------+-----------+----------+



### 3.5. Count the number of rows and colums
You can count the number of rows using the count method.

In [8]:
table.count()

2

We can count the number of columns and display their names using the following methods:

In [9]:
len(table.columns), table.columns

(3, ['studentID', 'studentName', 'city'])

### 3.6. Show a summary of the dataset

This will display the mean, standard deviation, min, max, count:

In [10]:
table.describe().show()

+-------+------------------+-----------+----------+
|summary|         studentID|studentName|      city|
+-------+------------------+-----------+----------+
|  count|                 2|          2|         2|
|   mean|               1.5|       null|      null|
| stddev|0.7071067811865476|       null|      null|
|    min|                 1|    Gabriel|Montpelier|
|    max|                 2|  Stanislas|     Paris|
+-------+------------------+-----------+----------+



### 3.7. Update data in a row
We will use the filter method to select the row we want to modify and specify the column of the value to be updated.

In [11]:
from pyspark.sql.functions import when
table = table.withColumn("city",
                            when(table["studentID"] == 2,"Villejuif").otherwise(table["city"]))
table.show()

+---------+-----------+---------+
|studentID|studentName|     city|
+---------+-----------+---------+
|        1|    Gabriel|    Paris|
|        2|  Stanislas|Villejuif|
+---------+-----------+---------+



### 3.8. SQL Context

We can also do some query operation using the SQL context:

In [17]:
table.createOrReplaceTempView("table")
sqlContext.sql("SELECT city FROM table where studentID>=2").show()

+---------+
|     city|
+---------+
|Villejuif|
+---------+



## Part 4: Dataset Manipulation
### 4.1. We first need to import a dataset
Spark as an automatic formater for csv , json, and text we will use a csv with countries characteristics here.

In [22]:
dataset = spark.read.option("header",True).option("delimiter",";").option("inferSchema",True).csv("factbook.csv")

In [23]:
dataset.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Area(sq km): integer (nullable = true)
 |-- Birth rate(births/1000 population): double (nullable = true)
 |-- Current account balance: long (nullable = true)
 |-- Death rate(deaths/1000 population): double (nullable = true)
 |-- Debt - external: long (nullable = true)
 |-- Electricity - consumption(kWh): long (nullable = true)
 |-- Electricity - production(kWh): long (nullable = true)
 |-- Exports: long (nullable = true)
 |-- GDP: long (nullable = true)
 |-- GDP - per capita: integer (nullable = true)
 |-- GDP - real growth rate(%): double (nullable = true)
 |-- HIV/AIDS - adult prevalence rate(%): double (nullable = true)
 |-- HIV/AIDS - deaths: integer (nullable = true)
 |-- HIV/AIDS - people living with HIV/AIDS: integer (nullable = true)
 |-- Highways(km): integer (nullable = true)
 |-- Imports: long (nullable = true)
 |-- Industrial production growth rate(%): double (nullable = true)
 |-- Infant mortality rate(deaths/1000 live birth

In [24]:
dataset.select("Country").show()

+--------------------+
|             Country|
+--------------------+
|         Afghanistan|
|            Akrotiri|
|             Albania|
|             Algeria|
|      American Samoa|
|             Andorra|
|              Angola|
|            Anguilla|
|          Antarctica|
| Antigua and Barbuda|
|           Argentina|
|             Armenia|
|               Aruba|
|Ashmore and Carti...|
|           Australia|
|             Austria|
|          Azerbaijan|
|         Bahamas The|
|             Bahrain|
|        Baker Island|
+--------------------+
only showing top 20 rows



We can see that the import worked properly and we now have a dataset on which we can do querries 

We will try to do some simple as well as more complex querries to see how efficient spark is.

### 4.2. Querries examples

#### 4.2.1. Where clause example
Using dsl querries we can select the countries where the deat rate is superior to 20. Meaning the countries where more than 20 people our of 1000 dies a year.

In [45]:
from pyspark.sql.functions import col
dataset.select("Country","Death rate(deaths/1000 population)").where(col("Death rate(deaths/1000 population)")>20).show()

+--------------------+----------------------------------+
|             Country|Death rate(deaths/1000 population)|
+--------------------+----------------------------------+
|         Afghanistan|                             20.75|
|              Angola|                              25.9|
|            Botswana|                             29.36|
|Central African R...|                             20.27|
|             Lesotho|                             25.03|
|              Malawi|                             23.39|
|          Mozambique|                             20.99|
|               Niger|                             21.33|
|        Sierra Leone|                             20.61|
|        South Africa|                             21.32|
|           Swaziland|                             25.26|
|              Zambia|                             20.23|
|            Zimbabwe|                             24.66|
+--------------------+----------------------------------+



#### 4.2.2. Max Example

In [84]:
dataset.agg({"Area(sq km)": "max"}).show()

+----------------+
|max(Area(sq km))|
+----------------+
|        17075200|
+----------------+



#### 4.2.3. Order By example

In [54]:
from pyspark.sql.functions import col, asc,desc
dataset.select("Country","GDP").orderBy(col("GDP - real growth rate(%)").desc()).show()

+-----------------+-------------+
|          Country|          GDP|
+-----------------+-------------+
|             Iraq|  89800000000|
|             Chad|  15660000000|
|          Liberia|   2903000000|
|Equatorial Guinea|   1270000000|
|        Venezuela| 145200000000|
|            Macau|   9100000000|
|          Ukraine| 299100000000|
|           Angola|  23170000000|
|         Ethiopia|  54890000000|
|    Liechtenstein|    825000000|
|         Mongolia|   5332000000|
|       Tajikistan|   7950000000|
|          Uruguay|  49270000000|
|    Faroe Islands|   1000000000|
|       Azerbaijan|  30010000000|
|          Georgia|  14450000000|
|       Kazakhstan| 118400000000|
|            China|7262000000000|
|          Armenia|  13650000000|
|            Qatar|  19490000000|
+-----------------+-------------+
only showing top 20 rows



We can find the Highest GDP using the collect method in this command line:

In [80]:
dataset.select("GDP","Country").orderBy(col("GDP").desc()).limit(1).show()


+--------------+-------------+
|           GDP|      Country|
+--------------+-------------+
|11750000000000|United States|
+--------------+-------------+



#### 4.2.4. Nested querry example
We can find the country with the second highest gdp using a nested querry like this: 

In [77]:
dataset.select("Country","GDP").where(col("GDP") <   \
                                      ( dataset.select("GDP").orderBy(col("GDP").desc()).limit(1).collect()[0]["GDP"] ) \
                                      ).orderBy(col("GDP").desc()).limit(1).show()

+---------------+--------------+
|        Country|           GDP|
+---------------+--------------+
|#European Union|11650000000000|
+---------------+--------------+



#### 4.2.5. Describe 
The describe function allows to process the count, mean, standard deviation, min & max of the columns selected. 

In [88]:
dataset.describe("GDP","Area(sq km)","Railways(km)").show()

+-------+--------------------+-----------------+------------------+
|summary|                 GDP|      Area(sq km)|      Railways(km)|
+-------+--------------------+-----------------+------------------+
|  count|                 230|              263|               134|
|   mean|2.925613713043478...|584987.4866920152|10217.246268656716|
| stddev|1.252331334754941...|1881415.546777828|29857.440213148762|
|    min|             1500000|                0|                 6|
|    max|      11750000000000|         17075200|            228464|
+-------+--------------------+-----------------+------------------+



## Part 5: Comparison with other technologies

## Conclusion
From what we saw Spark support sql and dsl querries. We can do everything SQL does and even more using pySpark or other modules, allowing to use others function under a python environment.