In [2]:
from pyspark.sql import SparkSession

In [3]:
spark =SparkSession \
    .builder \
    .appName("Sample_EDA") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
df = spark.read.parquet('data')

# Spark
Spark is a computing system used on clusters which is based on memory(RAM).
Spark fundamental data structure are RDDs. In order to construct those, the data is load from distributed file system as HDFS or EMRFS to memory partition of typical 128MB.  
**How it works**:  

   1. Loads the data to RDDs
   2. User Submits APP to file system client and Resource manager(RM):
       - Resoure manager defines a container to run a application master (AM)  
         
       - AM and RM comunicate and allocate conainers and resources to the Driver program and the Executor program. ( Driver program containts the spark context which will send information to both worker nodes and Cluster Manager as it possible to see in the image bellow)  
         
       - Spark and Appliaction Manager and RM comunicate to update the files and hive metadata.  
         
       - AM tells RM that the job is finished and RM deallocates the resources.


<img src="media/SparkContext.png">

**Where is our data?**  
   In our case the data is in a Amazon S3 Object Storage Service which is managed by Elastic Map Reduce File System (EMRFS) which is the implementation of hadoop on the Amazon S3 disk.

## Spark Shell
to access write pyspark  
Commands:  
  - `master`: Specifies a cluster to connect.
    - Possible values:
      - yarn
      - spark://masternode:port ( Spark standalone )
      - mesos://masternode:port ( Mesos standalone ) Mesos is a cluster computing system
      - local  [ \* ] | runs locally (* defines the number of threads)
  - `jars`: additional JAR files.
  - `py-files`: Additional Python Files
  - `name`: application name.`


## Spark RDD Concepts

### Definition: Resilient Distributed dataset
RDDs are the first level of abstraction of spark. They are written in SCALA and they run on a JVM.
RDD as the name say is :
   - Resilient - Each time a processing occurs the map of this processing is kept it and if a partition dies the system automaticaly reproduces the data present in that partition.   
   - Distributed - Data Stays and it is processed on multiple nodes in a cluster.  
   - Dataset - Data is initially in a primitive format as like tuples created either from a source or programmatically.

#### RDD Characteristics
`In-Memory`:Data in a RDD is stored in memory as long as it is needed.  
  
`Immutable`:It is not possible to change the data inside an rdd. Every time we which to make a transformation we are actually creating a new RDD.  
  
`Lazy evaluated`:The data inside an RDD is not able or transformed until an action that starts the execution is executed.   
  
`Cacheable`: All the data can either stay on memory or in a disk. Although disk are not preferible once they have a much lower access speed.  
  
`Parallel`: Data is process in parallel.  
  
`Type`: RDD records have types. ( RDD\[int, str\] , RDD\[long\])  
  
`Partitioned`: Records are split into logical partitions and distributed across nodes in a cluster.
  
`Local-Sticknesss`: RDD can define placement preferences to compute partitions ( as close to the records as possible)

### Exectution: Resilient Distributed dataset

#### PIPELINE  
Transformations RDD object are defined through an API.  
Spark reads the transformations and creates a Directed Acyclic Graph Scheduler.  
Then the task is passed and manage to a worker/executor by a task scheduler.  
Task scheduler is responsible by guaranteeing that that specific task is well processed which includes retrying if something goes wrong.

#### Execution components
**JOB**  
The complete process of the data in spark can be seen as a Job.  
**Stages**  
Jobs have, then, inside them different stages.    
The necessity of existing stages is defined according with the possibility of processing the data in each partition or the need of joining everything togehter. In order words. If there are two narrow transformations they can be performed in the same stage, although if there is a narrow transformation and then a wide transformation two stages will be created.  
**Task**   
Task is the lowest level of abstraction that is recommend to understand spark. Basically a task is an action that is applied in a partition of an RDD. Typically the same task is applied to all the partitions of an RDD but run in paralell.

#### Partitions
Partitions are blocks of information from which the data will be processed.  
By definition when reading from a distributed file system a partition will be created for every existing block in the system that it is needed for the creation of that specific RDD.

#### Tranformations
Transformations are lazy operations, which basically means that each time the a transformation is written using the API it will be written on the DAGscheduler and then when an tigger command is executed the graph will be executed.  
There are 2 types of operations:
   - **Narrow transformations:** Narrow transformations can be applied to each partition idenpendently creating the new partitions idependently in th enew RDD. Spark groups all the narrow transformations into a stage which is also called pipelining.
       - Map
       - Filter
       - Union
       - Join with inputs co-partioned
   - **Wide transformations:** Wide transformations require that the data is passed through the network 
       - GroupByKey
       - Join with inputs not co-partioned  
         
**NOTE:** RDDs by nature are just a value per element, but spark is specially prepared to understand some times of this elements.  
That said RDDs which have defined by a tuple with a key and a value are interpreted by spark as key-value pairRDDS.On the other hand if the tuple are just two numerical values theyare interpreted by spark as DoubleRDDs.  

??That said there are different functions for different rdds. Thats why it exists ByKey extension to the name of some funcitons.??

### RDD Narrow Transformations
**MAP Functions**
Map functions have into account 2 things:
   - Map function: which can be 3 types:
       - Normal Map Function  
       - Exploding Map Function  
       - Aggregation Map Function   
   - Partitions  
   
**Filter Functions**  
  
Filter function selects elements that satisfy a given condition
  
**Union**  
Union is equivalent to append. 
  
**Join**  
Joins the values by keys.






In [53]:
## MAP functions
data_treated = df.rdd.map(lambda x:(x['ID_DATE'],str(x['ID_DATE'])[0:4]+'/'+str(x['ID_DATE'])[4:6]+'/'+str(x['ID_DATE'])[6:8]))\
        .distinct()
data_treated.collect()

[(20190630, '2019/06/30'),
 (20190430, '2019/04/30'),
 (20190531, '2019/05/31'),
 (20190331, '2019/03/31')]

In [63]:
##Filter Functions
data_filter = data_treated.filter(lambda x: '30' in x[1]).map(lambda x: (x[0],x[1]+'_filtered'))

In [64]:
data_filter.collect()

[(20190630, '2019/06/30_filtered'), (20190430, '2019/04/30_filtered')]

In [58]:
## Union
data_treated.union(data_filter).collect()

[(20190630, '2019/06/30'),
 (20190430, '2019/04/30'),
 (20190531, '2019/05/31'),
 (20190331, '2019/03/31'),
 (20190630, '2019/06/30'),
 (20190430, '2019/04/30')]

In [65]:
## Join
data_treated.join(data_filter).collect()

[(20190430, ('2019/04/30', '2019/04/30_filtered')),
 (20190630, ('2019/06/30', '2019/06/30_filtered'))]

### RDD Wide transformations
**GroupBy**  
Groups all the elements.
This function will need the user to then define an aggregation function. Like this:
rdd.groupby().agg(aggregation function)
Group by loads the data to all the network and then process it.
On the other hand reduce by groups by inside each partition and then groups by those results throught the network into a final result.  
**ReduceBY**  
ReduceBy function




  - .reduceByKey(lambda v1,v2 : function): where v1 is the log/value $i$ and v2 is the log/value  $i+1$,  $i \space \in \space {0,len(RDD)}$ 
  - .groupByKey().`function` : in this case we used the `mapValues` function which can be defined:
  - .mapValues(lambda a:function): here you get an iterator `a` which can be used to generator lists and apply other methods

In [24]:
df.select('ID_DATE').rdd.map(lambda x: str(x[0])+ ' wowow ').distinct().collect()

['20190430 wowow ', '20190531 wowow ', '20190331 wowow ', '20190630 wowow ']

## pair RDD functions  
Spark as specific RDD functions for key-value pairs RDDs  
Common ways to create a pairRDD:  
  - `rdd.map`  
    
  - `rdd.keyBy`   
    
  - `rdd.flatMap`  

Functions:
  - `rdd.countByKey()` : returns a map with the count of occurrences of each key
    
  - `rdd.groupByKey()` : Groups all the values for each key in an RDD 
    
  - `rdd.sortByKey()` : Sorts in ascending or descending order
    
  - `rdd.join()` : returns a RDD containing all pairs with matching keys from two RDDs
    
  - `rdd.lefOuterJoin()` / `rdd.rightOuterJoin()` / `rdd.fullOuterJoin()`
    
  - `rdd.mapValues()` : executes a function on values only, keeping the key the same (also applicable for flatMapValues)
    
  - `rdd.lookup(key)` : returns the value(s) for a key as a list 
  


# 2. Spark SQL

Spark SQL is a module of spark designed to process structured data.
The data is represent by two different types of abstractions:
  - DataFrame
  - Dataset  
  
Both use the same execution engine to compute the result.   
Spark Catalyst is a library built as a rule-based system. And each rule focusses on the specific optimization.  

## 2.1 Data Frames

#### What is?
Spark data frames are a level of abstraction which can be interpreted as a pandas dataframe in python. The difference is that when an action is executed all the depending transformations written before will be applied in an optimized way. This is done by passing all the code into a optimzed graph which is then executed using map-reduce functions.????



Spark catalyst optimizer executes query plans, and it executes the queries on RDDs.

it is an extension of RDDs with better levels of abstraction

#### Features of DataFrames:  
  - Hability to process a great amount of data as KiloBytes PentaBytes...
  - Hability to connect to different sources of information simultaneously like SQL databases Non SQL databases csv files...etc 
  - State of the Art code optimization and code generation through Spark Catalyst optimizer (tree tranfosrmation framework
  - Can be easily integrated with Big-Data tools and frameworks with Spark.
  - Provides good API for python, R, Scala and Java programming.

### 2.1.1 Data Frames Schemas  
A dataframe always have is defined by a schema.
A Schema is basicaly a representation of dataframe inside spark.
Schema needs to be fine when the dataframe is created, ahtough they can be defined automatically by infering column types
  
  
  
Schemas are composed of pyspark.sql.types.StructType, that contain a list of pyspark.sql.types.StructFields, which is a class that can represent multiple data types (e.g., String, Integer). Each StructFields has:

  - name – string, name of the field.
  - dataType – DataType of the field. Must be instantiated into a specific DataType (e.g. StringType)
  - nullable – boolean, whether the field can be null (None) or not.
  - metadata – a dict from string to simple type that can be toInternald to JSON automatically


In [41]:
from pyspark.sql import types as T # convetion is to call types T
data = [(1,'Valter',100),(2,'Manel',150),(3,'Asdrubal',200)]
fields = [
    T.StructField('id',T.IntegerType(),False),# this boolean defines if this field accepts nullable
    T.StructField('Name', T.StringType()),
    T.StructField('value',T.IntegerType())
]
schema = T.StructType(fields)
schema
df = spark.createDataFrame(data,schema)

In [26]:
df.schema

StructType(List(StructField(id,IntegerType,false),StructField(Name,StringType,true),StructField(value,IntegerType,true)))

In [27]:
df.schema.fields

[StructField(id,IntegerType,false),
 StructField(Name,StringType,true),
 StructField(value,IntegerType,true)]

In [33]:
print(df.schema.fields[0].name,df.schema.fields[0].dataType ,df.schema.fields[0].nullable)


id IntegerType False


In [20]:
schema.fieldNames()

['id', 'Name', 'value']

In [13]:
df = spark.createDataFrame(data,schema)

### Rows
`pyspark.sql.Row`: represents a row of data in a DataFrame where fields can be acessed as attributes
Row function can be used to create a Row object using as argumnts the name of The column and $=$ sign and the data in the format which is supposed to be introduced in that row. This fields must be sorted.
Fields inside the row can be acessed either in a ordinal way, like an array, or by name

In [34]:
df.collect()

[Row(id=1, Name='Valter', value=100),
 Row(id=1, Name='Manel', value=100),
 Row(id=1, Name='Asdrubal', value=100)]

### Columns
Most of dataframe transformations require a column specification to do so the API as multiple ways of acessing the columns.  
It is possible to access columns in multiple ways like:
  - String
  - DataFrame
  - `pyspark.sql.functions.column`
  - by index
  - by named column?

In [44]:
## parte prática algo simples
df.select('id').show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
+---+



### Expressions

Spark is compatible with most SQL-like expressions and column operations. These operations include:
  - Arithmetic operators:+, -, %, /, *
  

  - Logical operators: >=, >, ==, <, <=, ==, !=, &&, ||, and, or, not
  

  - String functions: like, contains, substr

  
  - Data testing functions: isNull, isNotNull, NaN (not a number)

  
  - Sorting functions: asc, desc


### Actions

In [45]:
df.count()## returns the number of elements

3

In [46]:
n=2
df.take(n)# take n elements from the dataframe. this selection is made from the first element

[Row(id=1, Name='Valter', value=100), Row(id=2, Name='Manel', value=150)]

In [48]:
df.collect() # collect sends all the preivous defined transformations to the optimizer which then executes and returns the result final

[Row(id=1, Name='Valter', value=100),
 Row(id=2, Name='Manel', value=150),
 Row(id=3, Name='Asdrubal', value=200)]

In [49]:
df.show() # executes all the actions as collect and shows the final result in a pretty table

+---+--------+-----+
| id|    Name|value|
+---+--------+-----+
|  1|  Valter|  100|
|  2|   Manel|  150|
|  3|Asdrubal|  200|
+---+--------+-----+



In [50]:
# saves data into a file
##df.write....

TypeError: 'DataFrameWriter' object is not callable

### Transformations

In [52]:
df.select('id').show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
+---+



In [42]:
df.where(df.id==1).collect()

[Row(id=1, Name='Valter', value=100)]

In [53]:
df.limit(n)# creates a dataframe with n rows

DataFrame[id: int, Name: string, value: int]

In [58]:
df.orderBy(df.Name).show()# orders the data frame by a column

+---+--------+-----+
| id|    Name|value|
+---+--------+-----+
|  3|Asdrubal|  200|
|  2|   Manel|  150|
|  1|  Valter|  100|
+---+--------+-----+



### Joining

`df.join(other data frame,on,how)`

In [60]:
df.join(df,'id','inner').show()

+---+--------+-----+--------+-----+
| id|    Name|value|    Name|value|
+---+--------+-----+--------+-----+
|  1|  Valter|  100|  Valter|  100|
|  3|Asdrubal|  200|Asdrubal|  200|
|  2|   Manel|  150|   Manel|  150|
+---+--------+-----+--------+-----+



In [61]:
df.crossJoin(df).show()# cartesian product 
# dataframe inside of the function will be the right side of the cartesian product.
# Catersian product side is only important because of the order but in practice if order is not a problem side does not metter

+---+--------+-----+---+--------+-----+
| id|    Name|value| id|    Name|value|
+---+--------+-----+---+--------+-----+
|  1|  Valter|  100|  1|  Valter|  100|
|  1|  Valter|  100|  2|   Manel|  150|
|  1|  Valter|  100|  3|Asdrubal|  200|
|  2|   Manel|  150|  1|  Valter|  100|
|  2|   Manel|  150|  2|   Manel|  150|
|  2|   Manel|  150|  3|Asdrubal|  200|
|  3|Asdrubal|  200|  1|  Valter|  100|
|  3|Asdrubal|  200|  2|   Manel|  150|
|  3|Asdrubal|  200|  3|Asdrubal|  200|
+---+--------+-----+---+--------+-----+



In [67]:
df_extended = df.union(df).show()

+---+--------+-----+
| id|    Name|value|
+---+--------+-----+
|  1|  Valter|  100|
|  2|   Manel|  150|
|  3|Asdrubal|  200|
|  1|  Valter|  100|
|  2|   Manel|  150|
|  3|Asdrubal|  200|
+---+--------+-----+



### Aggregation

`df.groupBy()`: Groups the DataFrame using the specified columns, allowing for aggregated operation. I guess that groupBy in dataframes is done as the reduceBy in RDDs once it is the optimized way of doing it.

In [68]:
df.groupBy(df.id).count().show() #Groups the DataFrame using the specified columns, allowing for aggregated operations

+---+-----+
| id|count|
+---+-----+
|  1|    1|
|  3|    1|
|  2|    1|
+---+-----+



it is also possible to use a generalized groupBy function the `.agg()` function which allows to define a dictionary which will map how olumns will be groupBy.

aggregate functions are avg, max, min, sum, count.

In [75]:
df.agg({'value': 'sum'},{'value':'max'}).show()

AssertionError: all exprs should be Column

In [84]:
from pyspark.sql import functions as f
df.agg(f.sum(df.value),f.max(df.value),f.min(df.value)).show()

+----------+----------+----------+
|sum(value)|max(value)|min(value)|
+----------+----------+----------+
|       450|       200|       100|
+----------+----------+----------+



In [92]:
df.agg(*(f.countDistinct(f.col(c)).alias(c) for c in df.columns)).show()
#df.agg(f.countDistinct(c)in df.columns)

+---+----+-----+
| id|Name|value|
+---+----+-----+
|  3|   3|    3|
+---+----+-----+



In [95]:
def printt(x1,x2,x3):
    print(x1,x2,x3)
    return 1
printt(*(f.countDistinct(f.col(c)).alias(c) for c in df.columns))

Column<b'count(DISTINCT id) AS `id`'> Column<b'count(DISTINCT Name) AS `Name`'> Column<b'count(DISTINCT value) AS `value`'>


1

### Saving DataFrames

To save DataFrames, the DataFrameWriter object is used. It behaves similarly to the DataFrameReader, and is able to save into tables or files. 

The DataFrameWrite(`df.write`) methods include:
  - `.format`: specify the source datatype 
    
  - `.mode`: determines the behavior if a directory or tables already exists. Has the following options – error, overwrite, append, ignore (default is error)
  
  - `.partitionBy`: stores data in partitioned directories in the form of column=value (as with Hive)
  
  - `.option`: specifies properties for target


In [98]:
df.write.format('parquet').save('test.parquet')

In [103]:
spark.read.parquet('test.parquet')

DataFrame[id: int, Name: string, value: int]

### Cool functions

In [105]:
df.repartition("id")

DataFrame[id: int, Name: string, value: int]

In [None]:
## Dataframe cool functions
df = df.repartition("country") # change partitions to be done by country uniques??????
