# Lab 1 - First steps with Spark

At this end of this lab :

- export your notbook : File > Export Notebook As ... > Export Notebook To HTML
- An turn your cluster off !
    
## Creating a Spark session

Write in a new cell :

- `spark`  to chech if everithing is ok

In [54]:
from pyspark.sql import SparkSession

spark = (SparkSession 
         .builder
         .appName("Lab 1")
         .master("local[5]") #parallélisation sur 5 threads
         .getOrCreate()
        )

In [55]:
#Spark session
spark



## 💾First steps with Spark - Data importation

Spark's main object class is the DataFrame, which is a distributed table. It is analogous to R's or Python (Pandas)'s data frames: one row represents an observation, one column represents a variable. But contrary to R or Python, Spark's DataFrames can be distributed over hundred of nodes.

Spark support multiple data formats, and multiple  ways to load them.

- data format : csv, json, parquet (an open source column oriented format)
- can read archive files
- schema detection or user defined schema. For static data, like a json file, schema detection can be use with good results.

Spark has multiple syntaxes to import data. Some are simple with no customisation, others are more complexes but you can specify options.

The simplest syntaxes to load a json or a csv file are :

```python
# JSON
json_df = spark.read.json([location of the file])
# csv
csv_df = spark.read.csv([location of the file])

```

In the future, you may consult the [Data Source documentation](https://spark.apache.org/docs/latest/sql-data-sources.html) to have the complete description of Spark's reading abilities.

The data you will use in this lab are real data from the twitter [sampled stream API](https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/introduction) and [filtered stream API](https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/introduction). The tweets folder contains more than 50 files and more than 2 million tweets. The tweets was collected between the 14/04/2021 and the 18/04/2021. The total collection time was less than 10 hours.

---

### ✍Hands-on 1  - Data importation

- Load the json file store here : `s3a://nrandriamanana/diffusion/formation/data/tweets/tweets20220414-090219.jsonl.gz` and name you data frame `df_tweet`

  <small> ⚙️ This file is an a `JSONL` (JSON-line) format, which means that each line of it is a JSON object. A JSON object is just a Python dictionary or a JavaScript object and looks like this: `{ key1: value1, key2: ["array", "of", "many values]}`). This file has been compressed into a `GZ` archive, hence the `.jsonl.gz` ending. Also this file is not magically appearing in your S3 storage. It is hosted on one of your teacher's bucket and has been made public, so that you can access it.</small>

- It's possible to load multiple file in a unique DataFrame. It's useful when you have daily files and want to process them all. It's the same syntax as the previous one, just specify a folder. Like `s3a://nrandriamanana/diffusion/formation/data/tweets/`. Name you DataFrame `df_tweet_big`

In [51]:
# DataFrame creation
df_tweet = spark.read.json("s3a://nrandriamanana/diffusion/formation/data/tweets/tweets20220414-090219.jsonl.gz")
df_tweet_big = spark.read.json("s3a://nrandriamanana/diffusion/formation/data/tweets/")

# caching
df_tweet.cache()
df_tweet_big.cache()

                                                                                

DataFrame[data: struct<author_id:string,created_at:string,entities:struct<annotations:array<struct<end:bigint,normalized_text:string,probability:double,start:bigint,type:string>>,cashtags:array<struct<end:bigint,start:bigint,tag:string>>,hashtags:array<struct<end:bigint,start:bigint,tag:string>>,mentions:array<struct<end:bigint,id:string,start:bigint,username:string>>,urls:array<struct<description:string,display_url:string,end:bigint,expanded_url:string,images:array<struct<height:bigint,url:string,width:bigint>>,start:bigint,status:bigint,title:string,unwound_url:string,url:string>>>,id:string,lang:string,possibly_sensitive:boolean,public_metrics:struct<like_count:bigint,quote_count:bigint,reply_count:bigint,retweet_count:bigint>,source:string,text:string,withheld:struct<copyright:boolean,country_codes:array<string>>>, includes: struct<users:array<struct<created_at:string,id:string,name:string,username:string,verified:boolean,withheld:struct<country_codes:array<string>>>>>]

Printing schema and some rows

In [52]:
df_tweet.show(5)
df_tweet.printSchema()
df_tweet_big.show(5)
df_tweet_big.printSchema()


                                                                                

+--------------------+--------------------+
|                data|            includes|
+--------------------+--------------------+
|{1037039682822070...|{[{2018-09-04T18:...|
|{1391192446613160...|{[{2021-05-09T00:...|
|{3498010513, 2022...|{[{2015-09-08T23:...|
|{1203504398321647...|{[{2019-12-08T02:...|
|{3111644864, 2022...|{[{2015-03-27T20:...|
+--------------------+--------------------+
only showing top 5 rows

root
 |-- data: struct (nullable = true)
 |    |-- author_id: string (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- entities: struct (nullable = true)
 |    |    |-- annotations: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- normalized_text: string (nullable = true)
 |    |    |    |    |-- probability: double (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- type: string (nullable = tr

[Stage 56:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|                data|            includes|
+--------------------+--------------------+
|{8566371157357076...|{[{2017-04-24T22:...|
|{1100882149060812...|{[{2019-02-27T22:...|
|{1647610112, 2022...|{[{2013-08-05T11:...|
|{1227057358683066...|{[{2020-02-11T02:...|
|{287239880, 2022-...|{[{2011-04-24T16:...|
+--------------------+--------------------+
only showing top 5 rows

root
 |-- data: struct (nullable = true)
 |    |-- author_id: string (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- entities: struct (nullable = true)
 |    |    |-- annotations: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- normalized_text: string (nullable = true)
 |    |    |    |    |-- probability: double (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- type: string (nullable = tr

                                                                                

## 🥉Data frame basic manipulations

If DataFrames are immutable, they can however be **_transformed_** in other DataFrames, in the sense that a modified copy is returned. Such **transformations** include: filtering, sampling, dropping columns, selecting columns, adding new columns...

First, you can get information about the columns with:

```python
df.columns       # get the column names
df.schema        # get the column names and their respective type
df.printSchema() # same, but human-readable
```

You can select columns with the `select()` method. It takes as argument a list of column name. For example :

```python
df_with_less_columns = df\
  .select("variable3","variable_four","variable-6")

# Yes, you do need the ugly \ at the end of the line,
# if you want to chain methods between lines in Python
```

You can get nested columns easily with :

```python
df.select("parentField.nestedField")
```

To filter data you could use the `filter()` method. It take as input an expression that gets evaluated for each observation and should return a boolean. Sampling is performed with the `sample()` method. For example :

```python
df_with_less_rows = df\
  .sample(fraction=0.001)\
  .filter(df.variable1=="value")\
  .show(10)
```

As said before your data are distributed over multiple nodes (executors) and data inside a node are split into partitions. Then each transformations will be run in parallel. They are called *narrow transformation* For example, to sample a DataFrame, Spark sample every partitions in parallel because sample all partition produce the sample DataFrame. For some transformations, like `groupBy()` it's impossible, and it's cannot be run in parallel.

![](https://raw.githubusercontent.com/HealerMikado/panorama_big_data_2021/main/labs/lab%202%20-%20first%20steps%20with%20Spark/img/spark_exemple1_pipeline.png)

<!-- take() collect() limit() first() show() -->
<!-- lien vers la doc https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html#dataframe-apis -->

### 😴Lazy evaluation

This is because Spark has what is known as **lazy evaluation**, in the sense that it will wait as much as it can before performing the actual computation. Said otherwise, when you run an instruction such as:

```python
tweet_author_hashtags = df_tweet_big.select("auteur","hashtags")
```

... you are not executing anything! Rather, you are building an **execution plan**, to be realised later.

Spark is quite extreme in its laziness, since only a handful of methods called **actions**, by opposition to **transformations**, will trigger an execution. The most notable are:

1. `collect()`, explicitly asking Spark to fetch the resulting rows instead of to lazily wait for more instructions,
2. `take(n)`, asking for `n` first rows
3. `first()`, an alias for `take(1)`
4. `show()` and `show(n)`, human-friendly alternatives[^5]
5. `count()`, asking for the numbers of rows
6. all the "write" methods (write on file, write to database), see [here](https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html#input-and-output) for the list

[^5]: `first()` is exactly `take(1)` ([ref]( https://stackoverflow.com/questions/37495039/difference-between-spark-rdds-take1-and-first)) and show prints the result instead of returning it as a list of rows ([ref](https://stackoverflow.com/questions/53884994/what-is-the-difference-between-dataframe-show-and-dataframe-take-in-spark-t))

**This has advantages:** on huge data, you don't want to accidently perform a computation that is not needed. Also, Spark can optimize each **stage** of the execution in regard to what comes next. For instance, filters will be executed as early as possible, since it diminishes the number of rows on which to perform later operations. On the contrary, joins are very computation-intense and will be executed as late as possible. The resulting **execution plan** consists in a **directed acyclic graph** (DAG) that contains the tree of all required actions for a specific computation, ordered in the most effective fashion.

**This has also drawbacks.** Since the computation is optimized for the end result, the intermediate stages are discarded by default. So if you need a DataFrame multiple times, you have to cache it in memory because if you don't Spark will recompute it every single time. 

---

### ✍Hands-on 2 - Data frame basic manipulations

- How many rows have your two DataFrame ?

In [53]:
print(f'the small DF has {df_tweet.count()} rows')
print(f'the big DF has {df_tweet_big.count()} rows')

the small DF has 10000 rows


2022-07-13 12:18:41,090 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_36_0 !
2022-07-13 12:19:04,954 ERROR scheduler.TaskSchedulerImpl: Lost executor 2 on 10.233.86.38: 
The executor with id 2 exited with exit code 137(SIGKILL, possible container OOM).



The API gave the following container statuses:


	 container name: spark-kubernetes-executor
	 container image: docker.io/inseefrlab/jupyter-datascience:latest
	 container state: terminated
	 container started at: 2022-07-13T11:06:26Z
	 container finished at: 2022-07-13T12:18:41Z
	 exit code: 137
	 termination reason: OOMKilled
      
2022-07-13 12:19:04,964 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 60.0 (TID 124) (10.233.86.38 executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: 
The executor with id 2 exited with exit code 137(SIGKILL, possible container OOM).



The API gave the following container statuses:


	 container name: spark-kubernetes-exe

Py4JJavaError: An error occurred while calling o258.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 60.0 failed 4 times, most recent failure: Lost task 1.3 in stage 60.0 (TID 130) (10.233.86.151 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: 
The executor with id 5 exited with exit code 137(SIGKILL, possible container OOM).



The API gave the following container statuses:


	 container name: spark-kubernetes-executor
	 container image: docker.io/inseefrlab/jupyter-datascience:latest
	 container state: terminated
	 container started at: 2022-07-13T12:17:46Z
	 container finished at: 2022-07-13T12:20:08Z
	 exit code: 137
	 termination reason: OOMKilled
      
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


- Sample `df_tweet_big` and keep only 10% of it. Create a new DataFrame named `df_tweet_sampled`. If computations take too long on the full DataFrame, use this one instead or add a sample transformation in your expression. 

In [None]:
df_tweet_sampled = df_tweet_big.sample(fraction=0.1)

- Define a DataFrame `tweet_author_hashtags`  with only the `auteur` and `hashtags` columns

In [None]:
tweet_author_hashtags = df_tweet_big.select("auteur", "hashtags")
tweet_author_hashtags.show(5)

- Print (few lines of) a DataFrame with only the `auteur`, `mentions`, and `urls` columns. (`mentions` and `urls` are both nested columns in `entities`.)

In [None]:
df_tweet_big.select("auteur", "entities.urls", "entities.mentions").show(5)

- Filter your first DataFrame and keep only tweets with more than 1 like. Give a name for this new, transformed DataFrame and print. Print (few lines of) it.

In [None]:
df_tweet_big.filter(df_tweet_big.like_count > 0).show(5)

## 🥈Basic DataFrame column manipulation 

You can add/update/rename column of a DataFrame with spark :

- Drop : `df.drop(columnName : str )`
- Rename : `df.withColumnRenamed(oldName : str, newName : str)`
- Add/update : `df.withColumn(columnName : str, columnExpression)` 

For example

```python
tweet_df_with_like_rt_ratio = tweet_df\
  .withColumn(        # computes new variable
    "like_rt_ratio", # like_rt_ratio "OVERCONFIDENCE"
    (tweet_df.like_count /flights.retweet_count
   )

```

See [here](https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html#functions) for the list of all functions available in an expression.

### ✍Hands-on 3 - Basic DataFrame column manipulation  

- Define a DataFrame with a column names `interaction_count`. This column is the sum of `like_count`, `reply_count` and `retweet_count`.

In [None]:
df_tweet_big_interation = df_tweet_big.withColumn("interaction_count", df_tweet_big.like_count+df_tweet_big.reply_count+df_tweet_big.retweet_count )
df_tweet_big_interation.show(5)

- Update the DataFrame you imported at the beginning of this lab and drop the `other` column

In [None]:
df_tweet_big = df_tweet_big.drop("other")

## 🥇Advance DataFrame column manipulation 

### 🥽Array manipulation

Some columns often contain arrays (lists) of values instead of just one value. This may seem surprising but this actually quite natural. For instance, you may create an array of words from a text, or generate a list of random numbers for each observation, etc.

You may **create array of values** with:

- `split(text : string, delimiter : string)`, turning a text into an array of strings

You may **use array of values** with:

- `size(array : Array)`, getting the number of elements

- `array_contains(inputArray : Array, value : any)`, checking if some value appears

- `explode(array : Array)`, unnesting an array and duplicating other values. For instance it if use `explode()` over the hashtags value of this DataFrame:

  | Auteur | Contenu                             | Hashtags         |
  | ------ | ----------------------------------- | ---------------- |
  | Bob    | I love #Spark and #bigdata          | [Spark, bigdata] |
  | Alice  | Just finished #MHrise, best MH ever | [MHrise]         |

  I will get :

  | Auteur | Contenu                             | Hashtags         | Hashtag |
  | ------ | ----------------------------------- | ---------------- | ------- |
  | Bob    | I love #Spark and #bigdata          | [Spark, bigdata] | Spark   |
  | Bob    | I love #Spark and #bigdata          | [Spark, bigdata] | bigdata |
  | Alice  | Just finished #MHrise, best MH ever | [MHrise]         | MHrise  |

  

All this function must be imported first :

```python
from pyspark.sql.functions import split, explode, size, array_contains
```

Do not forget, to create a new column, you should use `withColumn()`. For example : 

```python
df.withColumn("new column", explode("array"))
```

#### ✍Hands-on 4 - Array manipulation 

- Keep all the tweets with hashtags and for each remaining line, split the hashtag text into an array of hashtags

In [None]:
from pyspark.sql.functions import split, explode, size, array_contains

In [None]:
df_tweet_big.filter(size("hashtags") > 0).withColumn("hashtag", explode("hashtags")).show(5)

- Create a new column with the number of words of the `contenu` column. (Use `split()` + `size()`)

In [None]:
df_tweet_big.withColumn("word_count", size(split("contenu", " "))).show(5)

- Count how many tweet contain the `covid19` hashtag (use the `count()` action)

In [None]:
df_tweet_big.filter(array_contains("hashtags", "COVID19")).count()

### 🥼User defined function

For more very specific column manipulation you will need Spark's `udf()` function (*User Defined Function*). It can be useful if you Spark does not provide a feature you want. But Spark is a popular and active project, so before coding an udf, go check the documentation. For instance for natural language processing, Spark already has some [functions](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#pyspark.ml.feature.Tokenizer). Last things, python udf can lead to performance issues (see https://stackoverflow.com/a/38297050) and learning a little bit of scala or java can be a good idea.

For example :

```python
# !!!! DOES NOT WORK !!!!
def to_lower_case(string):
	return string.lower()
	
df.withColumn("tweet_lower_case", to_lower_case(df.contenu))
```

will just crash. Keep in mind that Spark is a distributed system, and that Python is only installed on the central node, as a convenience to let you execute instructions on the executor nodes. But by default, pure Python functions can only be executed where Python is installed! We need `udf()` to enable Spark to send Python instructions to the worker nodes.

Let us see how it is done :

```python
# imports
from pyspark.sql.functions import udf
from pyspark.sql.functions import explode
from pyspark.sql.types import StringType

# pure python functions
def to_lower_case(string):
    return string.lower()

# user definid function
to_lower_case_udf = udf(
    lambda x: to_lower_case(x), StringType()
) #we use a lambda function to create the udf.

# df manipulation
df_tweet_small\
  .select("auteur","hashtags")\
  .filter("size(hashtags)!=0")\
  .withColumn("hashtag", explode("hashtags"))\
  .withColumn("hashtag", to_lower_case_udf("hashtag")).show(10)
```

---

#### ✍Hands-on 5 - User defined function 

- Create an user defined function that counts how many words a tweet contains. (your function will return an `IntegerType` and not a `StringType`)

In [None]:
# imports
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# pure python functions
def word_count(string):
    return len(string.split(" "))

# user definid function
word_count_udf = udf(
    lambda x: word_count(x), IntegerType()
) #we use a lambda function to create the udf.

# df manipulation
df_tweet_big\
  .withColumn("word_count", word_count_udf("contenu")).show(10)

## 🔩Aggregation functions

Spark offer a variety of aggregation functions :

- `count(column : string)` will count every not null value of the specify column. You cant use `count(1)` of `count("*")` to count every line (even row with only null values)

- `countDisctinct(column : string)` and `approx_count_distinct(column : string, percent_error: float)`. If the exact number is irrelevant, `approx_count_distinct()`should be preferred.

  Counting distinct elements cannot be done in parallel, and need a lot data transfer. But if you only need an approximation, there is a algorithm, named hyper-log-log (more info [here](https://databricks.com/fr/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html)) that can be parallelized. 

  ```python
  from pyspark.sql.functions import count, countDistinct, approx_count_distinct
  
  df.select(count("col1")).show()
  df.select(countDistinct("col1")).show()
  df.select(approx_count_distinct("col1"), 0.1).show()
  ```

- You have access to all other common functions `min()`, `max()`, `first()`, `last()`, `sum()`, `sumDistinct()`, `avg()` etc (you should import them first `from pyspark.sql.functions import min, max, avg, first, last, sum, sumDistinct`) 

---

### ✍Hands-on 6 - Aggregation functions

- What are the min, max, average of `interaction_count`

In [None]:
from pyspark.sql.functions import min, max, avg, first, last, sum, sumDistinct

In [None]:
df_tweet_big_interation.select(min("interaction_count"),max("interaction_count"),avg("interaction_count")).first()

- How many tweets have hashtags ? Distinct hashtags ? Try the approximative count with 0.1 and 0.01as maximum estimation error allowed.

In [None]:
from pyspark.sql.functions import count, countDistinct, approx_count_distinct

In [None]:
df_tweet_big.select(count("hashtags"), countDistinct("hashtags"), approx_count_distinct("hashtags", 0.1), approx_count_distinct("hashtags",0.01)).show()

## Grouping functions

Like SQL you can group row by a criteria with Spark. Just use the `groupBy(column : string)` method. Then you can compute some aggregation over those groups.

```python
df.groupBy("col1").agg(
  count("col2").alias("quantity") # alias is use to specify the name of the new column
).show() 
```

The `agg()` method can take multiples argument to compute multiple aggregation at once.

```python
df.groupBy("col1").agg(
	count("col2").alias("quantity"), min("col2").alias("min"), avg("col3").alias("avg3") ).show()
```

Aggregation and grouping transformations work differently than the previous method like `filter()`, `select()`, `withColumn()` etc. Those transformations cannot be run over each partitions in parallel, and need to transfer data between partitions and executors.  They are called "wide transformations"

<img src="https://raw.githubusercontent.com/HealerMikado/panorama_big_data_2021/main/labs/lab%202%20-%20first%20steps%20with%20Spark/img/spark_exemple2_pipeline.png" style="zoom:30%;" />
---

### ✍Hands-on 7 - Grouping functions

- Compute a daframe with the min, max and average retweet of each `auteur`. Then order it by the max number of retweet in descending order by . To do that you can use the following syntax

  ```python
  from pyspark.sql.functions import desc
  df.orderBy(desc("col"))
  ```

In [None]:
from pyspark.sql.functions import desc
df_tweet_big.groupBy("auteur").agg(min("retweet_count").alias("min_RT"), max("retweet_count").alias("max_RT"), avg("retweet_count").alias("avg_RT")).orderBy(desc("max_RT")).show(5)

## 🔌Spark SQL

Spark understand SQL statement. It's not a hack nor a workaround to use SQL in Spark, it's one a the more powerful feature in Spark. To use SQL in you need :

1. Register a view pointing to your DataFrame

   ```python
   my_df.createOrReplaceTempView(viewName : str)
   ```

2. Use the sql function

   ```python
   spark.sql("""
   You sql statment
   """)
   ```

   You could manipulate every registered DataFrame by their view name with plain SQL.

In fact you can do most of this tutorial without any knowledge in PySpark nor Spark. Lot of things can be done in Sparkk only by only knowing SQL and how to use it in Spark. 

### ✍Hands-on 8 - Spark SQL

- How many tweets have hashtags ? Distinct hashtags ? 

In [None]:
df_tweet_big.select("contenu", "hashtags").createOrReplaceTempView("view_hashtag_content")

spark.sql("""
SELECT COUNT(*), COUNT(DISTINCT(contenu))
FROM view_hashtag_content
WHERE size(hashtags) > 0
""").show(5)

- Compute a dataframe with the min, max and average retweet of each `auteur` using Spark SQL

In [None]:
df_tweet_big.createOrReplaceTempView("view_tweet_big")
spark.sql("""
SELECT min(retweet_count), max(retweet_count), avg(retweet_count)
FROM view_tweet_big
GROUP BY auteur
ORDER BY MAX(retweet_count) DESC
""").show(5)

## Joins in Spark

Like a SQL, Spark can join two dataset  by comparing the value of one or more keys using joins. Joins are by nature wide transformation, so data will be transferred between executors and will take time. But Spark will distinct two cases :

- Big table to big table join
- Big table to small table join
- (The case small table to small table is irrelevant for big data)

And optimize the join according to the actual case. (For more information Spark: The Definitive Guide pages 148 - 151)

Doing a join is pretty easy. You need :

- At least two DataFrames (obviously) with columns with the same keys
- a join expression
- the join transformation

For instance :

```python
# Creation of 3 small DF
person=spark.createDataFrame([
    (0,"Bill Chambers",0,[100])
    ,(1,"Matei Zaharia",1,[500,250,100])
    ,(2,"Michael Armbrust",1,[250,100])])\
	.toDF("id","name","graduate_program","spark_status")
graduateProgram=spark.createDataFrame([
    (0,"Masters","School of Information","UC Berkeley")
    ,(2,"Masters","EECS","UC Berkeley"),(1,"Ph.D.","EECS","UC Berkeley")])\
	.toDF("id","degree","department","school")
sparkStatus=spark.createDataFrame([
    (500,"Vice President")
    ,(250,"PMC Member")
    ,(100,"Contributor")])\
	.toDF("id","status")
    
# A join expression
joinExpression=person["graduate_program"]==graduateProgram['id']

# The join transformation in action
person.join(graduateProgram, joinExpression).show()
```

By default Spark compute inner joins, but you can pass a third argument to the join transformation with its type. You can do :

- Inner joins : by default or with the "inner" argument
- Outer joins : "outer" argument
- Left / right outer joins : "left_outer", "right_outer" argument
- Left semi join : it's more a filter than a join. It only keep the row in the left DataFrame that have a match in the right DataFrame. Use he "left_semi" argument
- Left anti join : The opposite of the previous one. Use he "left_anti" argument
- You can do cross joins to, but it's a very bad idea to do so, so please don't !

### ✍ Hands-on 9 - Joins in Spark

- Import the files stored in `s3n://spark-lab-input-data-ensai20202021/users/` in a DataFrame  and its informations to your DataFrame. Filter your new DataFrame to only keep verified user (`verified == True`) and group by user and get the most active user of your DataFrame.

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType, TimestampType, BooleanType
schema = StructType([ \
    StructField("created_at",TimestampType(),True), \
    StructField("id",StringType(),True), \
    StructField("name",StringType(),True), \
    StructField("username", StringType(), True), \
    StructField("withheld",StructType([ 
        StructField("country_codes", ArrayType(StringType(),True)),\
        StructField("scope", StringType(), True)  \
    ])),
    StructField("verified",BooleanType(),True)             
  ])

In [None]:
df_user_big = spark.read.json("s3a://nrandriamanana/diffusion/formation/data/user")
df_user_big.cache()
df_user_big.show(5)

In [102]:
df_user_big.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- withheld: struct (nullable = true)
 |    |-- country_codes: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- scope: string (nullable = true)

In [47]:
from pyspark.sql.functions import to_date, lit
df_user_big.filter("created_at" > to_date(lit("2019-01-01"))).count()

NameError: name 'df_user_big' is not defined

In [48]:
df_user_big.filter("verified").count()

NameError: name 'df_user_big' is not defined

In [49]:
joinExpression=df_user_big["username"]==df_tweet_big['auteur']

df_user_big.join(df_tweet_big, joinExpression).show()

NameError: name 'df_user_big' is not defined

In [50]:
df_user_big.join(df_tweet_big, joinExpression).filter("verified").count()

NameError: name 'df_user_big' is not defined

2022-07-13 11:27:39,739 WARN k8s.ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
io.fabric8.kubernetes.client.WatcherException: too old resource version: 491337185 (491351069)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$TypedWatcherWebSocketListener.onMessage(WatchConnectionManager.java:103)
	at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
	at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**

**DO NOT FORGET TO TURN YOUR CLUSTER OFF!**