# BE 1: Introduction

This first notebook aims to present some very high level features of `PySpark`. 

Here are topics: 
* Initiate SpakSession
* Create and manipulate PySpark DataFrames - lazy computation
* Reading a "big" file and manipulate data
* Writting results

This first BE is done on a Jupyter Notebook. Note that you shall use this IDE only for debugging / testing your app, not for industrial application.


## 1/ Recap on Spark and distributed computation

Pyspark documentation can be found [here](https://spark.apache.org/docs/latest/api/python/getting_started/index.html).



### 1.1/ Cluster

This schema recap distribution of data between: master / workers

![img/spark-cluster-overview](img/spark-cluster-overview.png)

*NB:* For this BE, everything will be done on the same physical machine (we will "imagine" that we have access to numerous workers comming from several physical machine). All the `Pyspark` commands are the same in a cluster.

### 1.2/ Lazy computation

When Spark transforms data, it does not immediately compute the transformation but plans how to compute later. When actions such as `collect()` are explicitly called, the computation starts. This notebook shows the basic usages of the DataFrame, geared mainly for new users.

All actions can be found [here](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions).

Note that `show()` is an action. 

**Code**

Follow the BE: just execute the cells and see results.

*Tips*: Use Ctrl + Enter to execute a cell.

**Imports**

List here all your imports (good practice)

In [1]:
# To create a SparkSession:
from pyspark.sql import SparkSession
# Time windowing
from pyspark.sql.window import Window
# To manipulate time data
from datetime import datetime, date
# To manipulate "not distrubuted DataFrames"
import pandas as pd
# To manipulate "Distributed DataFrames"
from pyspark.sql import Row
# String into upper 
import pyspark.sql.functions as F
# Manage time (especially store computing time)
from time import time
# Access to System command and variables
import sys
import os

In [2]:
# Set up environ var
# ----------------------------------------------
# Just two set-up command to make it work.

os.environ['PYSPARK_PYTHON'] = sys.executable
# '/opt/rh/rh-python38/root/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## 2/ Create a `SparkSession`

`PySpark` applications start with initializing `SparkSession` which is the entry point of `PySpark` as below. 

*NB*: In case of running it in `PySpark shell` (command line) via pyspark executable, the shell automatically creates the session in the variable spark for users.

In [3]:
spark_session = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/17 16:16:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark_session

## 3/ Exercice 1: word count

Let's start with a log file. We work on a toy example of log file (source [here](https://www.ibm.com/docs/en/zos/2.2.0?topic=problems-example-log-file)).

This exercice contains:
* Reading a text file
* lines / words count
* filters

### 3.1/ Usefull commands

**Read text file**

Let's see what is the content of the file:

In [5]:
# Print the 5 first lines of the file to read (Linux command `head -n 5 <file>`)
!head -n 5 text.txt
# NB: use `!<OS command>` to use an OS command on Jupyter

head: cannot open ‘text.txt’ for reading: No such file or directory


This is a structured file containing multiples lines:
* lines with date + log (e.g. `03/22 08:51:01 INFO   :...locate_configFile: Specified configuration file: /u/user10/rsvpd1.conf`)
* lines with a two digits number (e.g. `01`)

In [6]:
# Read a text file lines by lines
lines=spark_session.sparkContext.textFile('data/text.txt')
# ex: [' 01 ', '03/22 08:51:01 INFO   :.main: *************** RSVP Agent started ***************', ...]

Everything is stored in an RDD ([Resilient Distributed Dataset](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)), meaning that the data is distributed between workers per *lines* (one block per workers, multiple lines per block).

*NB:* RDD is not used directly in `PySpark` as the latest versions propose `DataFrame` objects, easier to manipulate (see next Exercice).

**Count lines**

Command `count()` permits to count the number of lines. It is an ACTION.

In [7]:
# Count each line of the file (ACTION)
lines.count()

                                                                                

61

**Count every word**

Here is a short script to count each word occurence.

In [8]:
# Faltten (1 word by 'line')
words = lines.flatMap(lambda line: line.split(' '))
# ex: ['', '01', '', '03/22',...]

# Create a "key- value pairing" (key=word, value=1)
words_with_1 = words.map(lambda word: (word, 1))
# ex: [[('', 1), ('01', 1), ('', 1), ('03/22', 1),...]

# Sum all "values" having the same "keys" (same word)
word_counts = words_with_1.reduceByKey(lambda count1, count2: count1 + count2)
# Ex: [('', 104), ('03/22', 52), ...]

# Launch process and retrieve result (ACION)
result = word_counts.collect()

In [9]:
# Print the count of each word
for (word, count) in result[:10]:  # only the first 10
    print(f"Word `{word}`: {count}")

Word ``: 104
Word `03/22`: 52
Word `INFO`: 41
Word `:.main:`: 2
Word `***************`: 2
Word `RSVP`: 13
Word `Agent`: 1
Word `started`: 1
Word `:...locate_configFile:`: 1
Word `configuration`: 1


**Filter**

Use `filter(<filter function>)` to filter your objects.

In [10]:
# Filter all object in `lines` (lines of the file) containing word "time"
lines.filter(lambda l:  "timer" in l).collect()

['03/22 08:51:06 INFO   :....mailslot_create: creating mailslot for timer',
 '03/22 08:51:06 INFO   :...mailbox_register: mailbox allocated for timer']

*Action performed:*
* iterate on each lines `l` of the text
    * test if word "timer" is in the current line
    * if it's the case, store the line


*Explanation*: 
* `lambda`: create a python "lambda function": a function defined in one line
* `l`: input of the function (here content of the line, e.g. `'03/22 08:51:06 INFO   :....mailslot_create: creating mailslot for timer'`
* `"timer" in l`: a condition

**Map: apply a function to each elements**

Use `map(<function>)` to apply a`<function>` to each element of the 

In [11]:
# Example: split each lines by "/"
r = lines.map(lambda l: l.split("/")).collect()

r[:10]  # only discplay the 10 first

[[' 01 '],
 ['03',
  '22 08:51:01 INFO   :.main: *************** RSVP Agent started ***************'],
 [' 02 '],
 ['03',
  '22 08:51:01 INFO   :...locate_configFile: Specified configuration file: ',
  'u',
  'user10',
  'rsvpd1.conf'],
 ['03', '22 08:51:01 INFO   :.main: Using log level 511'],
 ['03',
  '22 08:51:01 INFO   :..settcpimage: Get TCP images rc - EDC8112I Operation not supported on socket.'],
 [' 03 '],
 ['03',
  '22 08:51:01 INFO   :..settcpimage: Associate with TCP',
  'IP image name = TCPCS'],
 ['03',
  '22 08:51:02 INFO   :..reg_process: registering process with the system'],
 ['03', '22 08:51:02 INFO   :..reg_process: attempt OS', '390 registration']]

### 3.2/ Exercices

**Exercice: filter and count all lines containing "WARNING" (uppercase)**

In [12]:
# Only keep lines with at least on word contains "WARNING"
lines.filter(lambda l: "WARNING" in l).collect()
# Ex: ['03/22 08:51:06 WARNING:.....mailslot_create: setsockopt(MCAST_ADD) failed - EDC8116I Address not available.', ...]



In [13]:
# Answer:
lines.filter(lambda l: "WARNING" in l).count()

4

**Exercice: count all lines containing only two digits**

E.g. ` 01 `

In [14]:
# Structure of these lines: " xx ". => ned to consider all lines starting with space " ".
lines.filter(lambda l: l[0] == " ").collect()
# ex: [' 01 ', ' 02 ', ' 03 ', ' 04 ', ' 05 ', ' 06 ']

[' 01 ', ' 02 ', ' 03 ', ' 04 ', ' 05 ', ' 06 ']

In [15]:
# Answer:
lines.filter(lambda l: l[0] == " ").count()

6

**Exercice: drop `"<date> <log level>:"` from each lines**
    
Example: 
`'03/22 08:51:06 WARNING:.....mailslot_create: setsockopt(MCAST_ADD) failed - EDC8116I Address not available.'`

into

`'.....mailslot_create: setsockopt(MCAST_ADD) failed - EDC8116I Address not available.'`

In [16]:
# Answer:
lines.map(lambda l: l.split(":.")[1:]).collect()[:10]

# Explanation: 
# * For each line (`l`)
# * split line by ":."
# * drop the first element

[[],
 ['main: *************** RSVP Agent started ***************'],
 [],
 ['..locate_configFile: Specified configuration file: /u/user10/rsvpd1.conf'],
 ['main: Using log level 511'],
 ['.settcpimage: Get TCP images rc - EDC8112I Operation not supported on socket.'],
 [],
 ['.settcpimage: Associate with TCP/IP image name = TCPCS'],
 ['.reg_process: registering process with the system'],
 ['.reg_process: attempt OS/390 registration']]

## 3/ Exercice 2: `DataFrame` manipulation

A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of `pyspark.sql.Rows`. 


Firstly, you can create a PySpark DataFrame from a list of 3 rows:

In [17]:
df = spark_session.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

In [18]:
print(df)

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]


`print()` only indicates the columns names and types. 

**Schema**

`pyspark.sql.SparkSession.createDataFrame` takes the schema argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.

In [19]:
df = spark_session.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')


In [20]:
print(df)

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]


**Show**

Lets see what contains the `df` DataFrame, with `show()` command:

In [21]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



Note that:
* `show()` is an action (explanation below).
* by default, `show()` only display the 20 first rows.

### 3.1/ `DataFrame` commands

We can manipulate `DataFrames` as tables, with:
* columns selection (`select()`)
* filters (`filter()`)
* Column creation (`withColumns()`)
* simple operations (`avg`, `min`, `max`, ...)
* ordering (`orderBy()`)
* any SQL command

**Column selection**

`select()` permits you to select one or multiple columns.

In [22]:
# Select cols `a` and `b`
df_ab = df.select("a", "b")

# Show result
df_ab.show()

+---+---+
|  a|  b|
+---+---+
|  1|2.0|
|  2|3.0|
|  3|4.0|
+---+---+



You can chain commands in Pyspark:

In [23]:
# Select columns "a" and "b" AND show result (No dataFrame is created here)
df.select("a", "b").show()

+---+---+
|  a|  b|
+---+---+
|  1|2.0|
|  2|3.0|
|  3|4.0|
+---+---+



**Column filtering**

To select a subset of rows, use `filter()`.

In [24]:
# Select rows where col "a" = 1
df_a = df.filter(df.a == 1)

# Show result
df_a.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



*NB*: Use `df.a` command inside a formula to refer to column "a".

**Column creation**

Store result of a past operation in a new column with `withColumns()` command.

Usage:
`withColumn(<new column name>, <column content (formula)>)`.

In [25]:
# Create a new column containing "c" column into upper.
df_uc = df.withColumn('upper_c', F.upper(df.c))

df_uc.show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



You can also update a column with this command

In [26]:
# Update column "a" with "a+b"
df.withColumn('a', df.a + df.b).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|3.0|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|5.0|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|7.0|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



*NB*: Find [here](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#module-pyspark.sql.functions) the list of available functions (source: `pyspark.sql.functions` library)

**Simple operations on groups**

PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy.
It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.

Here is an example on a new DataFrame:

In [27]:
df_fruits = spark_session.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df_fruits.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



Grouping and then applying the `avg()` function to the resulting groups.

In [28]:
df_fruits.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



You can also apply a function (`avg`, `min`, `max`, ...) on all the `Rows` using `select(<function>(<columns_name>))`

In [29]:
df_fruits.select(F.min('v1')).show()

+-------+
|min(v1)|
+-------+
|      1|
+-------+



To access to the value (here `1`):

In [30]:
# Store in intermediate var `v`
v = df_fruits.select(F.min('v1'))

v.collect()[0]['min(v1)']

1

**Ordering results**

Use `OrderBy(<column_name>)` to sort DataFrame by <column_name>.

In [31]:
# Sort `df_fruits.fruits` column in alphabetical order
df_fruits.orderBy("fruit").show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| blue|banana|  2| 20|
|  red|banana|  1| 10|
|  red|banana|  7| 70|
|  red|carrot|  5| 50|
|  red|carrot|  3| 30|
|black|carrot|  6| 60|
|  red| grape|  8| 80|
| blue| grape|  4| 40|
+-----+------+---+---+



**Using SQL command**

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:

In [32]:
# `df_fruits` will be referenced as "tableA" in our SQL command
df_fruits.createOrReplaceTempView("tableA")

# Count table rows
spark_session.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



### 3.2/ Exercice

You will then read a "big" file with Spark, and turn it into a SparkDataFrame. You will manipulate the file data.json containing some data of 10k agents in the word (fake data). (source: https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json). Here is an example of the 3 thrist lines of this file:

```
{"id":227417393,"longitude":100.85840672174572,"latitude":33.15219798270325,"country_name":"China","sex":"Male"}
{"id":6821129477,"longitude":-72.43795260265814,"latitude":19.325567983697297,"country_name":"Haiti","sex":"Female"}
{"id":2078667700,"longitude":80.85636526088884,"latitude":23.645271492037235,"country_name":"India","sex":"Female"}
```

This kind of data perfectly fit the SparkDataFrame format:
* Columns: corresponding to the fields (id, longitude, ...)
* Multiples Rows, one row per agent.

Note that Rows are distributed between nodes (the columns are not distributed) in a SparkDataFrame.

**Read a file**

In [33]:
df = spark_session.read.json("data/agents.json")

df.show()

+--------------------+----------+------------------+-------------------+------+
|        country_name|        id|          latitude|          longitude|   sex|
+--------------------+----------+------------------+-------------------+------+
|               China| 227417393| 33.15219798270325| 100.85840672174572|  Male|
|               Haiti|6821129477|19.325567983697297| -72.43795260265814|Female|
|               India|2078667700|23.645271492037235|  80.85636526088884|Female|
|               China| 477556555| 33.45864668881662|  93.33604038078953|Female|
|               India|1379059984|28.816938290678692|   80.7728698035823|Female|
|               India|2278934249|24.223974351280358|  80.14372690674512|  Male|
|         Philippines|4380736204|12.409991630883784| 122.75874146810197|Female|
|               India|1375733494|22.385712662257426|  77.90320433636231|Female|
|             Nigeria|3693807307| 9.967458870426357|  7.562942449523648|Female|
|                Mali|6552202234|16.8825

**Exercise 1: Count French male person**

In [34]:
# Answer
df.filter(df.country_name == "France").filter(df.sex=="Male").count()

37

**Exercice 2: Count the `Male` and `Female` per country name**

In [35]:
# Answer
df.groupby("country_name", "sex").count().orderBy("country_name").show()

+------------+------+-----+
|country_name|   sex|count|
+------------+------+-----+
| Afghanistan|Female|   32|
| Afghanistan|  Male|   17|
|     Albania|  Male|    2|
|     Albania|Female|    5|
|     Algeria|Female|   25|
|     Algeria|  Male|   29|
|      Angola|Female|   15|
|      Angola|  Male|   13|
|   Argentina|Female|   34|
|   Argentina|  Male|   18|
|     Armenia|  Male|    1|
|     Armenia|Female|    3|
|   Australia|  Male|   15|
|   Australia|Female|   18|
|     Austria|Female|    9|
|     Austria|  Male|    7|
|  Azerbaijan|  Male|    9|
|  Azerbaijan|Female|    9|
|     Bahrain|  Male|    2|
|  Bangladesh|Female|  109|
+------------+------+-----+
only showing top 20 rows



**Exercice 3: find the maximum available `latitude` between all the Chinese `Male` personn**

In [36]:
# Answer
df.filter(df.country_name == "China").filter(df.sex == "Male").select(F.max("longitude")).collect()[0]['max(longitude)']

134.55941926197372

## 4/ Exercice 3: Manipulate time series / anomaly detection

In this exercice, you will manipulate timestamped data. 

*Data decription*:
* Data came from `data.csv` file
* File containing 2 time series:
    * Timestamp column ('time')
    * orbit id: the id of the current orbit (10 orbits in the file)
    * tm_1: values of time serie 1
    * tm_2: value of time series 2
    
These data contains anomaly of 3 kinds: holes, shifts and spikes.

### 4.1/ Commands

**Read data**

In [37]:
# Linux command to display the 5 first line of the file
!head -n 5 data/data.csv

time,orbit_id,tm_1,tm_2
11/16/2022 15:26:47,0,0,1
11/16/2022 15:27:47,0,1,5
11/16/2022 15:28:47,0,6,2
11/16/2022 15:29:47,0,0,3


In [38]:
# Read a CSV file containing data, here, file contains a header indicating column names
df_spark = spark_session.read.options(header=True).csv("data/data.csv")

Let's see the schema of the data

In [39]:
df_spark

DataFrame[time: string, orbit_id: string, tm_1: string, tm_2: string]

Ah... `tm_1` and `tm_2` shall be float, and `orbit_id` shall be integer.

**Exercice: Re-read file with correct format**

In [40]:
# Read a CSV file containing data, here, file contains a header indicating column names
df_spark = spark_session.read.options(header=True, schema='time date, orbit_id integer, tm_1 double, tm_2 double').csv("data/data.csv")
df_spark = spark_session.read.options(header=True, inferSchema=True).csv("data/data.csv")

df_spark

DataFrame[time: string, orbit_id: int, tm_1: int, tm_2: int]

In [41]:
df_spark.show()

+-------------------+--------+----+----+
|               time|orbit_id|tm_1|tm_2|
+-------------------+--------+----+----+
|11/16/2022 15:26:47|       0|   0|   1|
|11/16/2022 15:27:47|       0|   1|   5|
|11/16/2022 15:28:47|       0|   6|   2|
|11/16/2022 15:29:47|       0|   0|   3|
|11/16/2022 15:30:47|       0|   1|   2|
|11/16/2022 15:31:47|       0|   3|   8|
|11/16/2022 15:32:47|       0|   4|   4|
|11/16/2022 15:33:47|       0|   1|   9|
|11/16/2022 15:34:47|       0|   4|   2|
|11/16/2022 15:35:47|       0|   3|   5|
|11/16/2022 15:36:47|       0|   0|   1|
|11/16/2022 15:37:47|       0|   5|   0|
|11/16/2022 15:38:47|       0|   4|   3|
|11/16/2022 15:39:47|       0|   2|   3|
|11/16/2022 15:40:47|       0|   2|   9|
|11/16/2022 15:41:47|       0|   8|   1|
|11/16/2022 15:42:47|       0|   2|   4|
|11/16/2022 15:43:48|       0|   6|   9|
|11/16/2022 15:44:48|       0|   3|   7|
|11/16/2022 15:45:48|       0|   5|   6|
+-------------------+--------+----+----+
only showing top

**Find holes**

Some part of the data contains gaps. You have to find it using `Window` function.

In [42]:
# Current window: ALL the timestamps (1 single window)
my_window = Window.orderBy('time').partitionBy('orbit_id')
# `artitionBy`: the blocks of the RDD are stored by `orbit_id`.

# INPUT: Df with col `time` and other columns with TM values (not used)
# OUTPUT: Same with new col `delay_col`
# PROCESS: Compute column `time` with lag (get time-1)
df_spark = df_spark.withColumn("delay_col", F.lag("time").over(my_window))
# Note: `null` value on first value of `delay_col`


In [43]:
df_spark.show(2)

+-------------------+--------+----+----+-------------------+
|               time|orbit_id|tm_1|tm_2|          delay_col|
+-------------------+--------+----+----+-------------------+
|11/16/2022 15:26:47|       0|   0|   1|               null|
|11/16/2022 15:27:47|       0|   1|   5|11/16/2022 15:26:47|
+-------------------+--------+----+----+-------------------+
only showing top 2 rows



In [44]:
# Enable legacy mode for date parsing
spark_session.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# INPUT: Df with col `time`, `t-1` and other columns with TM values (not used)
# OUTPUT: Same with new col `diff_time_col` containing lag between timesteps
# PROCESS: Calculate delay (t - [t-1])
df_spark = df_spark.orderBy("time").withColumn("diff_time_col", (F.unix_timestamp("time") - F.unix_timestamp("delay_col")))

df_spark.show(2)

+-------------------+--------+----+----+-------------------+-------------+
|               time|orbit_id|tm_1|tm_2|          delay_col|diff_time_col|
+-------------------+--------+----+----+-------------------+-------------+
|11/16/2022 15:26:47|       0|   0|   1|               null|         null|
|11/16/2022 15:27:47|       0|   1|   5|11/16/2022 15:26:47|         null|
+-------------------+--------+----+----+-------------------+-------------+
only showing top 2 rows



In [45]:
# Apply threshold
# -------------------------------------------------------
hole_th = 10
# Threshold in seconds
hole_threshold = hole_th * 60  # in seconds

# INPUT: Df with col `time`, `delay_col`, `diff_time_col` and other columns with TM values (not used)
# OUTPUT: Same with all rows where `diff_time_col`> threshold
# PROCESS: Filter rows > threshold (corresponds to hole)
df_hole_spark = df_spark.filter(F.column("diff_time_col") > hole_threshold)

In [46]:
df_hole_spark.show()

+----+--------+----+----+---------+-------------+
|time|orbit_id|tm_1|tm_2|delay_col|diff_time_col|
+----+--------+----+----+---------+-------------+
+----+--------+----+----+---------+-------------+



**PartitionBy Explained**


Display the two firsts times of each of the 10 orbits

In [47]:
# Create a new col `row` containing the row id
# do that over `myWindo` (partitioned by "orbit_id")
# Filter only the two first cols
df_spark.withColumn("row", F.row_number().over(my_window)).filter(F.col("row") <= 2).show()

+-------------------+--------+----+----+-------------------+-------------+---+
|               time|orbit_id|tm_1|tm_2|          delay_col|diff_time_col|row|
+-------------------+--------+----+----+-------------------+-------------+---+
|11/16/2022 15:26:47|       0|   0|   1|               null|         null|  1|
|11/16/2022 15:27:47|       0|   1|   5|11/16/2022 15:26:47|         null|  2|
|11/16/2022 18:07:54|       1|   9|   8|               null|         null|  1|
|11/16/2022 18:08:54|       1|   7|   0|11/16/2022 18:07:54|         null|  2|
|11/16/2022 20:14:59|       2|   7|  16|               null|         null|  1|
|11/16/2022 20:15:59|       2|   4|   0|11/16/2022 20:14:59|         null|  2|
|11/16/2022 22:39:05|       3|   6|   2|               null|         null|  1|
|11/16/2022 22:40:05|       3|   5|   0|11/16/2022 22:39:05|         null|  2|
|11/17/2022 01:19:12|       4|   4|   4|               null|         null|  1|
|11/17/2022 01:20:12|       4|   8|   7|11/17/2022 0

*NB:* Note that there is 10 times `null` value !

> Explaination: 
>* You have partitioned by `orbit_id`, there is 10 orbits in the Dataset
>* The delay is computed between times foreach partition
>* between partition k and k+1, no comparison > delay is `null` for the first 

**Write results**

Results are stored in a directory `foo.csv` containing a `csv` concatenated.

In [55]:
# Write results in a CSV file
df_spark.write.csv('foo.csv', header=True)

### 4.2/ Execrice

**Exercice 1: Get the count of each hole per duration**

Expecting a result like this: 

```
+----------------+-----+
|<hole duration> |count|
+----------------+-----+
|              61|   44|
|            4323|    1|
...
```


In [49]:
# Answer
df_spark.groupBy('diff_time_col').count().show()

+-------------+-----+
|diff_time_col|count|
+-------------+-----+
|         null| 1024|
+-------------+-----+



**Exercice 2: find anomaly of type 'spike' on `tm_2`**

The dataset contains very high values due to sensor anomaly. 

You have to filter it with a simple threshold method.

![img/spike](img/spike.png)

In [50]:
# Threshold
THRESH = [-1, 20]  # min, max

In [51]:
# ANSWER: for TM 1
df_spike = df_spark.filter((F.col('tm_2') > THRESH[1]) | (F.col('tm_2') < THRESH[0]))

# Only keep time col
df_spike = df_spike.select('time')

df_spike.show()

+-------------------+
|               time|
+-------------------+
|11/17/2022 10:05:33|
|11/17/2022 10:06:33|
+-------------------+



**Exercice 3: find the orbit containing a shift**

More complex, the time serie 2 (`tm_2`) contains shifts on a certain orbit. Find them:

* Use a windowing avegaring to detect high shifts (window on each orbit).
* Think to filter the spikes, that could affect the average value
* An "abnormal" (up to you to define it) value shall raise.

![img/shift.png](img/shift.png)

In [52]:
# Answer:

# 1/ Drop spike 
# ---------------------
# Create a column containing `True` if spike detected
df_spark = df_spark.withColumn("is_spike", (F.col('tm_2') > THRESH[1]) | (F.col('tm_2') < THRESH[0]))

df_spark_without_spike = df_spark.filter(~ F.col("is_spike"))

# 2/ Get the mean per orbit, and compare values
# --------------------------------------------------
df_spark_without_spike.groupby("orbit_id").mean("tm_2").show()

# Orbit 9 have a clear anomaly (to investigate by experts)

+--------+------------------+
|orbit_id|         avg(tm_2)|
+--------+------------------+
|       1| 4.559055118110236|
|       6| 4.680555555555555|
|       3|  4.36144578313253|
|       5| 4.597222222222222|
|       9|             3.875|
|       4|            4.4375|
|       8| 4.119047619047619|
|       7| 4.674242424242424|
|       2|              9.75|
|       0|4.2894736842105265|
+--------+------------------+



Orbit 2 is clearly the one containing anomalies !

In [53]:
# Considering spikes
df_spark.groupby("orbit_id").mean("tm_2").show()

+--------+------------------+
|orbit_id|         avg(tm_2)|
+--------+------------------+
|       1| 4.559055118110236|
|       6| 4.680555555555555|
|       3|  4.36144578313253|
|       5| 4.597222222222222|
|       9|             3.875|
|       4|            4.4375|
|       8| 4.119047619047619|
|       7| 1497.141791044776|
|       2|              9.75|
|       0|4.2894736842105265|
+--------+------------------+



## 5/ Exercice 4: optimize your workflow  

**Lazy computation explaination**: 
* `spark_session.createDataFrame(...)` create a dataFrame with multiple `Row` object
* These rows are *distributed* between the nodes (ex: Row 1 & 2 goes to node 1, Row 2 and 3 goes to node 2)
* All *transformation* (like `filter()`, `select()`, ...) are not directly done. The command is stored by the `TaskManager` ("lazy" computation)
* These commands are executed only when an *action* (like `show()`, `count()`, ...) is requested.

At the execution:
* 1/ The TaskManager optimize the tasks, it will re-organise the transformation requested (e.g. aggregating multiple filters, ...) to accelerate the computation time. At this stage, nothing is done.
* 2/ The TaskManager send commands to each nodes. These nodes execute in paralell the optimal workflow provided by the TaskManager
* 3/ All results are aggregated in the Master node. The final result is then available. In general, the data transfert create a bottleneck in your workflow. Use at minimal *transformations* to avoid calling multiple time this data transfert !


**Excercice: optimize the workflow of the previous exercice**

* Use `cache()` to store intermediate results
* Store results in separated CSV:
    * holes
    * spikes
    * orbit containing shift
