# TP 02: Introduction to DataFrame (according to spark)


## Quick guide to using DataFrame

https://spark.apache.org/docs/latest/sql-programming-guide.html

python api that lists all the functions applicable to a dataframe.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html

The aim of this tutorial is to start a spark session and manipulate dataframes by applying basic processing.

1. Start a pyspark session

2. Create a dataframe from an RDD or a list (don't forget to define a structure).

3. Create a pandas dataframe from a dataframe (spark)

4. Perform operations (compare with list processing)

5. Grouping and joins

6. Reading a csv
 
 https://notebooks.gesis.org/binder/jupyter/user/apache-spark-awl6064c/notebooks/python/docs/source/getting_started/quickstart_df.ipynb


In [1]:
# python package list
# !python --version
# !pip list

### 1. Start a spark session

(we want to start a local session and give it a name).

We'll use the SparkSession class from the [*pyspark.sql*](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html) sub-module .

Call *sc* the *spark.sparkContext*.

- *builder*: A class attribute having a Builder to construct SparkSession instances

- Specify what is defined by [*SparkSession*](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/spark_session.html) and *sparkContext*.

- the master is 'local' (*master("local[\*])* (it is possible to specify the number of cores *local[\*]* or for example *local[4]*) 

- the application must be given a name *appName('name appli')*.

- *getOrCreate()* : Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

In [2]:
from datetime import datetime, date
import pandas as pd

# import pyspark and sparksession
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import *
from pyspark.sql import Row

# link pyspark and spark
import findspark
findspark.init()

ModuleNotFoundError: No module named 'findspark'

In [3]:
# define spark session
spark = SparkSession.builder.master("spark://spark-master:7077").appName('TP02_dataframe').getOrCreate()
# create sparkContext variable
sc = spark.sparkContext



Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/17 08:22:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# display sparkContext
sc

In [5]:
# pypsark version == spark version
pyspark.__version__

'4.0.1'

A port (4040 by default) that you should definitely look at (link Spark UI)

It allows you to follow the progress of a spark process (Spark UI).

http://localhost:4040



[*.parallelize()*](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html) : Distribute a local Python collection to form an RDD

### 2. Create a dataframe

### 2.1 Create a dataframe from rdd
- create RDD from a list
     - create list (list_1 and list_2)
     - create rdd (rdd_l1 from list_1 and rdd_l2 from list_2


In [6]:
# list
list_1 = [(1, 0), (1, 8), (1, 3), (1, 3), (1, 1), (1, 9), (1, 6), (1, 6), (1, 11), (1, 2)]
print(type(list_1))

# Build RDD
rdd_l1 = sc.parallelize(list_1, 2)
print(rdd_l1)
print(rdd_l1.collect())




<class 'list'>
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:297
[(1, 0), (1, 8), (1, 3), (1, 3), (1, 1), (1, 9), (1, 6), (1, 6), (1, 11), (1, 2)]


In [7]:
list_2 = [{'numero': 1, 'valeur': 0},{'numero': 1, 'valeur': 2}]

print(type(list_2))

# Build RDD
rdd_l2 = sc.parallelize(list_2, 2)
print(rdd_l2)
print(rdd_l2.collect())

<class 'list'>
ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:297
[{'numero': 1, 'valeur': 0}, {'numero': 1, 'valeur': 2}]


- create an RDD (resp. dl ) using the parallelize function of the sparkContext and the 2 previously created lists (list_1 and l1).
- display the number of partitions using the getNumPartitions() function (from the RDD)`

- Dataframe : 

[pysparl.sql.DataFrame]( https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html#)


- Create a DataFrame 

[pyspark.sql.SparkSession.createDataFrame](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.SparkSession.createDataFrame.html)


In [7]:
# Create a dataframe from a RDD 
df = spark.createDataFrame(list_1)

# Display the dataframe (only a small one or a small part)
df.show()

# Display the schema
df.printSchema()

df.explain(mode="formatted")

+---+---+
| _1| _2|
+---+---+
|  1|  0|
|  1|  8|
|  1|  3|
|  1|  3|
|  1|  1|
|  1|  9|
|  1|  6|
|  1|  6|
|  1| 11|
|  1|  2|
+---+---+

root
 |-- _1: long (nullable = true)
 |-- _2: long (nullable = true)

== Physical Plan ==
* Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [2]: [_1#0L, _2#1L]
Arguments: [_1#0L, _2#1L], MapPartitionsRDD[6] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)




In [8]:
# It's better with a column names
df = spark.createDataFrame(rdd_l1,['v2', 'value'])

# Display dataframe
df.show()

+---+-----+
| v2|value|
+---+-----+
|  1|    0|
|  1|    8|
|  1|    3|
|  1|    3|
|  1|    1|
|  1|    9|
|  1|    6|
|  1|    6|
|  1|   11|
|  1|    2|
+---+-----+



In [9]:
# Define a schema (StructType)

# dl1 has two columns
schema =    StructType([
        StructField("v3", IntegerType(), True),
        StructField("value", IntegerType(), True)
    ])

# use the previous rdd (dl1) to create dataframe df1
df1 = spark.createDataFrame(rdd_l1,schema)

# display schema
df1.printSchema()

# display result
df1.show()

root
 |-- v3: integer (nullable = true)
 |-- value: integer (nullable = true)

+---+-----+
| v3|value|
+---+-----+
|  1|    0|
|  1|    8|
|  1|    3|
|  1|    3|
|  1|    1|
|  1|    9|
|  1|    6|
|  1|    6|
|  1|   11|
|  1|    2|
+---+-----+



In [17]:
# again with rdd_l2
df = spark.createDataFrame(rdd_l2)

# display schema
df.printSchema()

# display result
df.show()

root
 |-- numero: long (nullable = true)
 |-- valeur: long (nullable = true)

+------+------+
|numero|valeur|
+------+------+
|     1|     0|
|     1|     2|
+------+------+



In [18]:
# count the elements number
df1.count()

10

In [19]:
# count the elements numlber where value is > 5  in df (utiliser where et count)
df.where('valeur > 5').count()

0

In [20]:
df.show()
panda = df.toPandas()

panda

+------+------+
|numero|valeur|
+------+------+
|     1|     0|
|     1|     2|
+------+------+



Unnamed: 0,numero,valeur
0,1,0
1,1,2


- Creating a panda dataframe

- Creating a spark dataframe from a panda dataframe

- Display the dataframe and its schema

We will use the list indicated in the comments

In [21]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)

print("df.show()")

df.show()
print("panda_df")

print(pandas_df)

df.explain(True)

df.show()
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

panda_df
   a    b        c           d                   e
0  1  2.0  string1  2000-01-01 2000-01-01 12:00:00
1  2  3.0  string2  2000-02-01 2000-01-02 12:00:00
2  3  4.0  string3  2000-03-01 2000-01-03 12:00:00
== Parsed Logical Plan ==
LogicalRDD [a#73L, b#74, c#75, d#76, e#77], false

== Analyzed Logical Plan ==
a: bigint, b: double, c: string, d: date, e: timestamp
LogicalRDD [a#73L, b#74, c#75, d#76, e#77], false

== Optimized Logical Plan ==
LogicalRDD [a#73L, b#74, c#75, d#76, e#77], false

== Physical Plan ==
*(1) Scan ExistingRDD[a#73L,b#74,c#75,d#76,e#77]



In [22]:
# display i columns (but it's possible in row)

df.show(vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
-RECORD 2------------------
 a   | 3                   
 b   | 4.0                 
 c   | string3             
 d   | 2000-03-01          
 e   | 2000-01-03 12:00:00 



In [23]:
df.columns

['a', 'b', 'c', 'd', 'e']

### 3. Dataframe operation

- select columns

- add new columns by processing

- create a dataframe (panda) consisting of 4 columns (1st column of type int, 2nd of type string, 3rd column a date, 4th column a float value)

- name the columns

In [24]:
pandas_df = pd.DataFrame({
    'indice': [1, 2, 3, 4],
    'prenom': ['john', 'jean', 'pierre', 'jacques'],
    'nom': ['doe', 'dupond', 'smith', 'durand'],
    'date': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1), date(2000, 4, 1)],
    'valeur': [1., 2., 3., 4.]
})
df = spark.createDataFrame(pandas_df)

df.describe().show()

df.show()

+-------+------------------+-------+-----+------------------+
|summary|            indice| prenom|  nom|            valeur|
+-------+------------------+-------+-----+------------------+
|  count|                 4|      4|    4|                 4|
|   mean|               2.5|   NULL| NULL|               2.5|
| stddev|1.2909944487358056|   NULL| NULL|1.2909944487358056|
|    min|                 1|jacques|  doe|               1.0|
|    max|                 4| pierre|smith|               4.0|
+-------+------------------+-------+-----+------------------+

+------+-------+------+----------+------+
|indice| prenom|   nom|      date|valeur|
+------+-------+------+----------+------+
|     1|   john|   doe|2000-01-01|   1.0|
|     2|   jean|dupond|2000-02-01|   2.0|
|     3| pierre| smith|2000-03-01|   3.0|
|     4|jacques|durand|2000-04-01|   4.0|
+------+-------+------+----------+------+



- add a 5th column indicating the type of data in the 2nd column

- add a 6th column which is the result of an arithmetic operation on the 4th column
  
- display the result

In [25]:
df.withColumn("operation",df.valeur +3).show()

from pyspark.sql.functions import concat_ws

df.withColumn("nom+prenom", concat_ws(" ","prenom","nom")).show()

df.explain(True)

+------+-------+------+----------+------+---------+
|indice| prenom|   nom|      date|valeur|operation|
+------+-------+------+----------+------+---------+
|     1|   john|   doe|2000-01-01|   1.0|      4.0|
|     2|   jean|dupond|2000-02-01|   2.0|      5.0|
|     3| pierre| smith|2000-03-01|   3.0|      6.0|
|     4|jacques|durand|2000-04-01|   4.0|      7.0|
+------+-------+------+----------+------+---------+

+------+-------+------+----------+------+--------------+
|indice| prenom|   nom|      date|valeur|    nom+prenom|
+------+-------+------+----------+------+--------------+
|     1|   john|   doe|2000-01-01|   1.0|      john doe|
|     2|   jean|dupond|2000-02-01|   2.0|   jean dupond|
|     3| pierre| smith|2000-03-01|   3.0|  pierre smith|
|     4|jacques|durand|2000-04-01|   4.0|jacques durand|
+------+-------+------+----------+------+--------------+

== Parsed Logical Plan ==
LogicalRDD [indice#110L, prenom#111, nom#112, date#113, valeur#114], false

== Analyzed Logical Plan

- apply an operation on the column using a function

(pandas functions can be applied)
https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html

In [26]:
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf


# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

@udf("float") 
def tripled(num):
  return 3*float(num)

multiply = pandas_udf(multiply_func, returnType=FloatType())

# Execute function as a Spark vectorized UDF
df.select("*",multiply(col("valeur"), col("valeur"))).show()
df.explain(mode="formatted")
df.withColumn('tripled_col', tripled(df.valeur)).show()
df.explain(True)

+------+-------+------+----------+------+-----------------------------+
|indice| prenom|   nom|      date|valeur|multiply_func(valeur, valeur)|
+------+-------+------+----------+------+-----------------------------+
|     1|   john|   doe|2000-01-01|   1.0|                          1.0|
|     2|   jean|dupond|2000-02-01|   2.0|                          4.0|
|     3| pierre| smith|2000-03-01|   3.0|                          9.0|
|     4|jacques|durand|2000-04-01|   4.0|                         16.0|
+------+-------+------+----------+------+-----------------------------+

== Physical Plan ==
* Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [5]: [indice#110L, prenom#111, nom#112, date#113, valeur#114]
Arguments: [indice#110L, prenom#111, nom#112, date#113, valeur#114], MapPartitionsRDD[72] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)


+------+-------+------+----------+------+-----------+
|indice| prenom|   nom|    

#### 4. The return of grouping, but applied to dataframes

In [27]:
# the following dataframe

df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])


- group by color and calculate average value and total number of elements

In [28]:
df1 = df.groupBy("color","fruit").mean("v1","v2").show()


+-----+------+-------+-------+
|color| fruit|avg(v1)|avg(v2)|
+-----+------+-------+-------+
|  red|banana|    4.0|   40.0|
| blue|banana|    2.0|   20.0|
|  red|carrot|    4.0|   40.0|
| blue| grape|    4.0|   40.0|
|black|carrot|    6.0|   60.0|
|  red| grape|    8.0|   80.0|
+-----+------+-------+-------+



merge and add

In [29]:
# the two following dataframes:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

# use inner, left, right,...

df1.join(df2,df1.time ==  df2.time,"inner") \
     .show(truncate=False)


+--------+---+---+--------+---+---+
|time    |id |v1 |time    |id |v2 |
+--------+---+---+--------+---+---+
|20000101|1  |1.0|20000101|1  |x  |
|20000101|1  |1.0|20000101|2  |y  |
|20000101|2  |2.0|20000101|1  |x  |
|20000101|2  |2.0|20000101|2  |y  |
+--------+---+---+--------+---+---+



Perform a grouping to create a 4-column dataframe (time id v1 v2)

In [30]:
# the two time columns are identical
df1.join(df2,["time"],"inner") \
     .show(truncate=False)

+--------+---+---+---+---+
|time    |id |v1 |id |v2 |
+--------+---+---+---+---+
|20000101|1  |1.0|1  |x  |
|20000101|1  |1.0|2  |y  |
|20000101|2  |2.0|1  |x  |
|20000101|2  |2.0|2  |y  |
+--------+---+---+---+---+



### Read CSV file and create dataframe

- read csv file and display dataframe

In [31]:
from pyspark.sql.functions import to_date

# read the file trafficaerien.csv

dftext = spark.read.csv("data/trafficaerien.csv")
dftext.printSchema()

dftext.show()

dftext1 = spark.read.options(delimiter=';').option("header",True).option("dateFormat", "yyyy-MM").csv("data/trafficaerien.csv")
dftext1.printSchema()

dftext1 = dftext1.withColumn('Month', to_date(dftext1['Month'], 'yyyy-MM'))

dftext1.printSchema()

dftext1.show()

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

mySchema = StructType([
    StructField("Month", DateType(), True),
    StructField("Passengers", IntegerType(), True)
    ])

dftext2 = spark.read.options(delimiter=';').option("header",True)\
   .option("dateFormat", "yyyy-MM").option("inferSchema", "true")\
   .csv("data/trafficaerien.csv", schema = mySchema)


dftext2.printSchema()

dftext2.show()

root
 |-- _c0: string (nullable = true)

+----------------+
|             _c0|
+----------------+
|Month;Passengers|
|  2014-01;190111|
|  2014-02;181712|
|  2014-03;211813|
|  2014-04;241231|
|  2014-05;265480|
|  2014-06;275565|
|  2014-07;298342|
|  2014-08;293137|
|  2014-09;269074|
|  2014-10;256367|
|  2014-11;197014|
|  2014-12;196291|
|  2015-01;193029|
|  2015-02;183588|
|  2015-03;216866|
|  2015-04;238951|
|  2015-05;269396|
|  2015-06;281930|
|  2015-07;302005|
+----------------+
only showing top 20 rows
root
 |-- Month: string (nullable = true)
 |-- Passengers: string (nullable = true)

root
 |-- Month: date (nullable = true)
 |-- Passengers: string (nullable = true)

+----------+----------+
|     Month|Passengers|
+----------+----------+
|2014-01-01|    190111|
|2014-02-01|    181712|
|2014-03-01|    211813|
|2014-04-01|    241231|
|2014-05-01|    265480|
|2014-06-01|    275565|
|2014-07-01|    298342|
|2014-08-01|    293137|
|2014-09-01|    269074|
|2014-10-01|    256367

In [5]:
# stop the spark session
sc.stop()