###  DOCUMENTATION:  
    https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html

### Initiate Spark 

control a Spark Application through a driver process called SparkSession

1) on console: ```spark```

2) on jupyternotebook: ```jupyter notebook``` then in a cell run


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Spark UI 

```http://localhost:4040/jobs/```


some documentation: https://spark.apache.org/docs/latest/web-ui.html


#todo to 5 

## Ch5: Basic Structured Operations

### Dataframes Schemas

1) schema-on-read (autodetect)

2) defined explicitly

In [2]:
# to load and check the schema  without schema(myManulaSchema)
spark.read.format('json').load('./data/flight-data/json/2015-summary.json').schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

In [4]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello":"world"})
])
flights_df = spark.read.format("json").schema(myManualSchema)\
  .load("./data/flight-data/json/2015-summary.json")

flights_df .schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

A schema is a ```StructType``` build by ```StructField``` made of:

    1) nameColumn
    
    2) typeColumn
    
    3) Nullable
    
    4) metadata (optional)

To check the schema 

In [5]:
flights_df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



### Columns

The book combines the Scala and PySpark API's.

In Scala / Java API, ```df.col("column_name")```,```df.col('column_name)```,```df("column_name")``` or  ```df.apply("column_name")``` return the Column.

Whereas in pyspark use the below to get the column from DF.

```df.colName```
```df["colName"]```

<b>HOWEVER </b>, if using  ```select``` it is also possible to use ```col("column_name")```

In [6]:
from pyspark.sql.functions import col, column

# df("someColumnName")
flights_df["DEST_COUNTRY_NAME"]
flights_df.DEST_COUNTRY_NAME


Column<'DEST_COUNTRY_NAME'>

In [7]:
flights_df.columns #column property to access columns 

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

## Expressions 

an expression parses transformations and column references from a string 

In [8]:
from pyspark.sql.functions import expr

df = spark.range(500).toDF("number")
df.select(df["number"] + 10).take(3)
 
df.select(expr("(((number + 5) * 200) - 6) < 5")).take(3)

[Row(((((number + 5) * 200) - 6) < 5)=False),
 Row(((((number + 5) * 200) - 6) < 5)=False),
 Row(((((number + 5) * 200) - 6) < 5)=False)]

### Rows
Each row is a single record, represented as an object of type ```Row```. To manipulate an object of type ```Row``` use a column expression (previous paragraph). Internally represent arrays of bytes.

In [9]:
df.first() # an example to check a type Row is printing

Row(number=0)

#### Create Rows
1) manually instanciatin an object ```Row``` (values in the same order and type as the schema of the df to which you have to append them


In [10]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [11]:
myRow[0] # to access the value

'Hello'

### DataFrame

#### Creating df
1) from a file / raw data sources ```spark.read.format('format').source('path/to/data')```
2) from a set of rows 

In [12]:
from pyspark import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("Welcome", StringType(), True),
  StructField("None", StringType(), True),
  StructField("number", LongType(), False, metadata={"hello":"world"})
])

myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-------+----+------+
|Welcome|None|number|
+-------+----+------+
|  Hello|null|     1|
+-------+----+------+



#### Transforming a Df
To transform a Df we can only manipulate columns (rows singularly are not accessible) and we can use 
1) ```select``` method

2) ```selectExpr``` method

3) ```import pyspark.sql.functions``` package

#### Transforming using SELECT

In [13]:
flights_df.select("DEST_COUNTRY_NAME").show(2) # singular selection

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [14]:
flights_df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2) #multiple selection


+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [15]:
from pyspark.sql.functions import expr, column, col 
flights_df.select(expr("DEST_COUNTRY_NAME"),
                 col("ORIGIN_COUNTRY_NAME"),
#                  column("count")               # column is not working 
                 ).show(5)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
|    United States|            Ireland|
|            Egypt|      United States|
|    United States|              India|
+-----------------+-------------------+
only showing top 5 rows



<b> NOT TRUE common mistake </b>: use a mix of column Objects and strings, does not give an error

In [16]:
from pyspark.sql.functions import expr, column, col 
flights_df.select(
                 col("ORIGIN_COUNTRY_NAME"),"DEST_COUNTRY_NAME"
                 ).show(5)

+-------------------+-----------------+
|ORIGIN_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-------------------+-----------------+
|            Romania|    United States|
|            Croatia|    United States|
|            Ireland|    United States|
|      United States|            Egypt|
|              India|    United States|
+-------------------+-----------------+
only showing top 5 rows



#### Rename columns 

In [17]:
flights_df.select(expr("ORIGIN_COUNTRY_NAME AS origin")).show(2)
flights_df.select(expr("ORIGIN_COUNTRY_NAME").alias("origin2")).show(2) #### NB the alias is INSIDE select

+-------+
| origin|
+-------+
|Romania|
|Croatia|
+-------+
only showing top 2 rows

+-------+
|origin2|
+-------+
|Romania|
|Croatia|
+-------+
only showing top 2 rows



#### Transforming using .selectExpr()
Because ```select``` and ```expr``` is a commone pattern --> short hand ```selectExpr```

In [18]:
flights_df.selectExpr("ORIGIN_COUNTRY_NAME AS origin").show(2)

+-------+
| origin|
+-------+
|Romania|
|Croatia|
+-------+
only showing top 2 rows



In [19]:
flights_df.selectExpr("*","ORIGIN_COUNTRY_NAME AS origin", "ORIGIN_COUNTRY_NAME = DEST_COUNTRY_NAME as withInCountry").show(2)

+-----------------+-------------------+-----+-------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| origin|withInCountry|
+-----------------+-------------------+-----+-------+-------------+
|    United States|            Romania|   15|Romania|        false|
|    United States|            Croatia|    1|Croatia|        false|
+-----------------+-------------------+-----+-------+-------------+
only showing top 2 rows



###  Literals
```lit``` is used to pass explicit values into Spark that are just a value. Need to be imported 

In [20]:
from pyspark.sql.functions import lit 
flights_df.select(expr("*"), lit(True)).show(2) # lit is OUTSIDE expr()
flights_df.select(expr("*"), lit(True).alias("True value?")).show(2) # lit is OUTSIDE expr()

#NB a difference with SCALA is that .alias() in Python is like .as() in Scala

+-----------------+-------------------+-----+----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|true|
+-----------------+-------------------+-----+----+
|    United States|            Romania|   15|true|
|    United States|            Croatia|    1|true|
+-----------------+-------------------+-----+----+
only showing top 2 rows

+-----------------+-------------------+-----+-----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|True value?|
+-----------------+-------------------+-----+-----------+
|    United States|            Romania|   15|       true|
|    United States|            Croatia|    1|       true|
+-----------------+-------------------+-----+-----------+
only showing top 2 rows



### Adding Columns with ```withColumns('column_name', value)``` 

In [21]:
flights_df.withColumn('numberOne', lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [22]:
# withColumn(column_name, expression)
flights_df.withColumn('withInCountry', expr("DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withInCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



### Rename a column ```withColumnRenamed('old_name', 'new_name')```

In [23]:
new_flights_df= flights_df.withColumnRenamed("DEST_COUNTRY_NAME", 'destination')
new_flights_df.show(2)

+-------------+-------------------+-----+
|  destination|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows



### Remove columns


In [24]:
new_flights_df=new_flights_df.drop('destination')
new_flights_df.show(2)

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|            Romania|   15|
|            Croatia|    1|
+-------------------+-----+
only showing top 2 rows



### Changing Column Type ```cast('type')```

In [25]:
new_flights_df.withColumn("StringNumber", col('count').cast('string')).show(2)
new_flights_df.withColumn("StringNumber", col('count').cast('string')).printSchema()

+-------------------+-----+------------+
|ORIGIN_COUNTRY_NAME|count|StringNumber|
+-------------------+-----+------------+
|            Romania|   15|          15|
|            Croatia|    1|           1|
+-------------------+-----+------------+
only showing top 2 rows

root
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)
 |-- StringNumber: string (nullable = true)



### Filtering ```where()``` or ```filter()```

There are two methods to perform this operation: you can use where or filter
and they both will perform the same operation and accept the same argument types when used
with DataFrames.

In [26]:
new_flights_df.filter(col("count") < 2 ).show(2)

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|            Croatia|    1|
|          Singapore|    1|
+-------------------+-----+
only showing top 2 rows



In [27]:
new_flights_df.where(col("count") < 2 ).show(2)
new_flights_df.where(expr("count")< 2 ).show(2)
# new_flights_df.where("count"< 2 ).show(2) # NOT working is comapring strign and number 

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|            Croatia|    1|
|          Singapore|    1|
+-------------------+-----+
only showing top 2 rows

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|            Croatia|    1|
|          Singapore|    1|
+-------------------+-----+
only showing top 2 rows



<b> NOT use multiple filters into the same expression.</b> Although this is
possible, it is not always useful, because Spark automatically performs all filtering operations at
the same time regardless of the filter ordering. This means that if you want to specify multiple
AND filters, <b> just chain them sequentially 

In [36]:
new_flights_df.where(col("count") >2).where(col("ORIGIN_COUNTRY_NAME") == "Romania" ).show(7)


+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|            Romania|   15|
+-------------------+-----+



### Getting Unique Rows: df.distinct()

In [39]:
new_flights_df.distinct().count()

220

In [41]:
new_flights_df.distinct().orderBy("count").show(5)

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|          Lithuania|    1|
|          Singapore|    1|
|          Gibraltar|    1|
|           Bulgaria|    1|
|            Namibia|    1|
+-------------------+-----+
only showing top 5 rows



### Filtering by Rows

In [88]:
new_flights_df.take(10)


[Row(ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(ORIGIN_COUNTRY_NAME='India', count=62),
 Row(ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(ORIGIN_COUNTRY_NAME='United States', count=1)]

.take() results in an Array of Rows. This is an action and performs collecting the data (like collect does).



In [89]:
flights_df.limit(10)


DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [90]:
flights_df.limit(10).show()


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



limit() results in a new Dataframe. This is a transformation and does not perform collecting the data.


### Random Sample: df.sample()
sample some random records from your DataFrame. sample(withReplacement=None, fraction=None, seed=None)
This is not guaranteed to provide exactly the fraction specified of the total count of the given DataFrame.

In [51]:
# in Python
seed = 5   
withReplacement = False #Sample with replacement or not (default False).
fraction = 0.5  # hFraction of rows to generate, range [0.0, 1.0].
new_flights_df.sample(withReplacement, fraction, seed).count()

138

In [52]:
new_flights_df.count()

256

In [54]:
new_flights_df.count()*fraction

128.0

### Random Splits .randomSplit([0.25, 0.75], seed)
Random splits can be helpful when you need to break up your DataFrame into a random “splits” of the original DataFrame. 

Parameters:

weights: list --> list of doubles as weights with which to split the DataFrame. Weights will be normalized if they don’t sum up to 1.0.

seed: int, optional  --> 
The seed for sampling.

In [61]:
dataFrames = new_flights_df.randomSplit([0.25, 0.75], seed)
dataFrames[0].show(3)
dataFrames[0].count()

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|             Angola|   13|
|           Anguilla|   38|
|          Australia|  258|
+-------------------+-----+
only showing top 3 rows



71

In [62]:
dataFrames[1].show(3)
dataFrames[1].count()

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|Antigua and Barbuda|  117|
|          Argentina|  141|
|              Aruba|  342|
+-------------------+-----+
only showing top 3 rows



185

### Concatenating and Appending Rows (Union)

In [71]:
# in Python
from pyspark.sql import Row
schema = flights_df.schema
newRows = [
Row("New Country", "Other Country", 5), Row("New Country 2", "Other Country 3", 1)
]
#Just drop the L; all integers in Python 3 are long. What was long in Python 2 is now the standard int type in Python 3.
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
newDF.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    5|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



In [91]:
reduced = flights_df.limit(2)
reduced.union(newDF).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|      New Country|      Other Country|    5|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



### Sorting Rows

sort
and orderBy that work the exact same way. They accept both column expressions and strings as
well as multiple columns. The default is to sort in ascending order:

In [97]:
flights_df.orderBy(["ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME"], ascending=False).show()

+--------------------+-------------------+------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+--------------------+-------------------+------+
|       United States|            Vietnam|     2|
|       United States|          Venezuela|   246|
|       United States|            Uruguay|    13|
|              Zambia|      United States|     1|
|           Venezuela|      United States|   290|
|             Uruguay|      United States|    43|
|       United States|      United States|370002|
|      United Kingdom|      United States|  2025|
|United Arab Emirates|      United States|   320|
|             Ukraine|      United States|    14|
|Turks and Caicos ...|      United States|   230|
|              Turkey|      United States|   138|
|             Tunisia|      United States|     3|
| Trinidad and Tobago|      United States|   211|
|         The Bahamas|      United States|   955|
|            Thailand|      United States|     3|
|              Taiwan|      United States|   266|


In [113]:
flights_df.sort(["ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME"], ascending=False).show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Vietnam|    2|
|    United States|          Venezuela|  246|
|    United States|            Uruguay|   13|
+-----------------+-------------------+-----+
only showing top 3 rows



### asc_nulls_first, desc_nulls_first, asc_nulls_last, or desc_nulls_last 

to specify where you would like your null values to appear in an ordered
DataFrame. Returns a sort expression based on ascending order of the column, and null values return before non-null values.

In [119]:
# check the syntax df.orderBy(df.column.asc_nulls_last())

flights_df.orderBy(flights_df.DEST_COUNTRY_NAME.asc_nulls_last()).show(3)



+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Algeria|      United States|    4|
|           Angola|      United States|   15|
|         Anguilla|      United States|   41|
+-----------------+-------------------+-----+
only showing top 3 rows



### sortWithinPartition vs orderBy vs sort 

The documentation of sortWithinPartition states it returns a new Dataset with each partition sorted by the given expressions

The easiest way to think of this function is to imagine a fourth column (the partition id) that is used as primary sorting criterion. The function spark_partition_id() prints the partition.

For example if you have just one large partition (something that you as a Spark user would never do!), sortWithinPartition works as a normal sort:



In [110]:
from pyspark.sql.functions import spark_partition_id
# This is non deterministic because it depends on data partitioning and task scheduling.
#https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.spark_partition_id.html


flights_df.repartition(1)\
        .sortWithinPartitions(["ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME"],\
                                               ascending=False)\
        .withColumn("partition", spark_partition_id()).show()

+--------------------+-------------------+------+---------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|partition|
+--------------------+-------------------+------+---------+
|       United States|            Vietnam|     2|        0|
|       United States|          Venezuela|   246|        0|
|       United States|            Uruguay|    13|        0|
|              Zambia|      United States|     1|        0|
|           Venezuela|      United States|   290|        0|
|             Uruguay|      United States|    43|        0|
|       United States|      United States|370002|        0|
|      United Kingdom|      United States|  2025|        0|
|United Arab Emirates|      United States|   320|        0|
|             Ukraine|      United States|    14|        0|
|Turks and Caicos ...|      United States|   230|        0|
|              Turkey|      United States|   138|        0|
|             Tunisia|      United States|     3|        0|
| Trinidad and Tobago|      United State


If there are more partitions, the results are only sorted within each partition:


Why would one use sortWithPartition instead of sort? 

<b> sortWithPartition does not trigger a shuffle </b>, as the data is only moved within the executors. sort however will trigger a shuffle. Therefore sortWithPartition executes faster. If the data is partitioned by a meaningful column, sorting within each partition might be enough.

In [111]:
flights_df.repartition(90)\
        .sortWithinPartitions(["ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME"],\
                                               ascending=False)\
        .withColumn("partition", spark_partition_id()).show()

+-------------------+-------------------+-----+---------+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|partition|
+-------------------+-------------------+-----+---------+
|   Marshall Islands|      United States|   42|        0|
|      United States|              Italy|  438|        0|
|      United States|           Anguilla|   38|        0|
|            Jamaica|      United States|  666|        1|
|            Hungary|      United States|    2|        1|
|      United States|              Qatar|  109|        1|
|         Luxembourg|      United States|  155|        2|
|              India|      United States|   61|        2|
|      United States|       Cook Islands|   13|        2|
|     United Kingdom|      United States| 2025|        3|
|           Kiribati|      United States|   26|        3|
|      United States|          Argentina|  141|        3|
|            Uruguay|      United States|   43|        4|
|         Guadeloupe|      United States|   56|        4|
|      French 

### Repartition

Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of partitions.


If you know that you’re going to be filtering by a certain column often, it can be worth
repartitioning based on that column:

In [120]:
# in Python
flights_df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### Coalesce
will not incur a full shuffle and will try to combine partitions. This operation will shuffle your data into five partitions based on the destination country name, and then coalesce them (without a full shuffle):

In [None]:
# in Python
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

###

In [121]:
# in Python
collectDF = flights_df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()
collectDF.toLocalIterator()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India         

<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f21ff565510>

In [124]:
collectDF.StorageLevel()

AttributeError: 'DataFrame' object has no attribute 'StorageLevel'

In [28]:
new_flights_df.limit(3).show()

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|            Romania|   15|
|            Croatia|    1|
|            Ireland|  344|
+-------------------+-----+



In [63]:
#new_flights_df.where(col("count") >2 | col("ORIGIN_COUNTRY_NAME") == "Romania" ).show(7)
#use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

###

###

## PART III  Low-Level API 

two sets of low-level APIs: 

1) one for manipulating distributed data (RDDs)

2) another for distributing and manipulating distributed shared variables (broadcast variables and accumulators).

### When use Low-Level API
reson to use Low-Level API

1) need some functionality that you cannot find in the higher-level APIs (ex. control over physical data placement across the cluster).

2) maintain some legacy codebase written using RDDs. (egacy code is source code inherited from someone else or inherited from an older version of the software).

3) do some custom shared variable manipulation. [ch. 14]

#### How to use it
A SparkContext is the entry point for low-level API functionality. You access it through the SparkSession, which is the tool you use to perform computation across a Spark cluster. [ch. 15]

```spark.sparkContext```

### Ch 12. Resilient Distributed Datasets
 
A RDD is fault-tolerant collection of elements that can be operated on in parallel. 

RDD represents an immutable, partitioned collection of records that can be operated
on in parallel. 


In RDDs the records are just Java, Scala, or Python objects of the
programmer’s choosing. You can store anything you want in these objects, in any format you want --> great control, but need to reinvent the wheel and be able to optimize 


While in **DataFrames** each record is a structured row containing
fields with a known schema. 

#### Traits of RDD

**Immutable (Read only cant change or modify):** Data is safe to share across processes. It can be created or retrieved anytime which makes caching, sharing & replication easy. It is a way to reach consistency in computations.
**Partitioned:** It is basic unit of parallelism in RDD. Each partition is logical division of data/records.
**Persistence:** Option of choosing which storage will be used either in-memory or on-disk.
**Cacheable:** It holds data in persistent storage (memory/disk) so that they can be retrieved more quickly on the next request for them.
**Fault Tolerant:**  Resilient  means its capability to reconcile, recover or get back all the data (coarse/fine grained & low overhead) using lineage graph.
**Action/Transformations:** All computations in RDDs are actions or transformations.
**Coarse gained operations:** it’s applied to any or all components in datasets through maps or filter or group by operation.


#### 5 main RDDs properties  [[???]]

These properties determine all of Spark’s ability to schedule and execute the user program:
    A list of partitions
    A function for computing each split
    A list of dependencies on other RDDs
    Optionally, a Partitioner for key-value RDDs (e.g., to say that the RDD is hashpartitioned)
    Optionally, a list of preferred locations on which to compute each split (e.g., block
    locations for a Hadoop Distributed File System [HDFS] file)
    
#### Comparison Scala, Java, Python 
The RDD APIs are available in Python as well as Scala and Java. For Scala and Java. Python, however, can lose a substantial amount of performance when using RDDs.
Running Python RDDs equates to running Python user-defined functions (UDFs) row by row. we recommend building on the Structured APIs in Python and only dropping down to
RDDs if absolutely necessary.

### When to use RDDs 

**custom partitioning of data** --> need fine-grained control over the physical distribution of data.
   
   
####  Datasets vs RDDs (Scala and Java)
Datasets can still take advantage of the wealth of functions and optimizations that the Structured APIs have to offer. With Datasets, you do not need to choose between only operating on JVM types or on Spark types, you can choose whatever is either easiest to do or most flexible.

### DOCUMENTATION CODE: https://spark.apache.org/docs/3.1.1/api/python/_modules/pyspark/rdd.html#RDD

### Creating RDDs

get RDDs from:

1) existing Dataframes or Datasets (Interoperating between DataFrames, Datasets and RDDs) 
```df.rdd```

2) a local collection 
```spark.sparkContext.parallelize(myCollection, n_partitions)```

3) a Data source 
```spark.sparkContext.textFile('path/to/data)``` --> a record for each line
```spark.sparkContext.wholeTextFiles('path/to/data)``` --> a record for each text file


#### CASE 1: from existing Dataframes or Datasets 

In [57]:
# ex 1 - dataframe from range
spark.range(12) ## is a DataFrame
rdd = spark.range(12).rdd # is a RDDs

In [13]:
# CASE 1: ex 2 - dataframe from a file
df_from_file = spark.read.format('json').load('./data/flight-data/json/2015-summary.json')
df_from_file ## is a DataFrame
df_from_file.rdd # is a RDDs

MapPartitionsRDD[62] at javaToPython at NativeMethodAccessorImpl.java:0

**To operate on this data, you will need to convert this Row object to the correct data type**

In [30]:
spark.range(13).toDF('new_name_column')

DataFrame[new_name_column: bigint]

In [31]:
spark.range(10).toDF("id").show() #DataFrame[id: bigint] 

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [24]:
spark.range(10).toDF("id").rdd
#MapPartitionsRDD[79] at javaToPython at NativeMethodAccessorImpl.java:0
#NB. 'RDD' object has no attribute 'show'

MapPartitionsRDD[103] at javaToPython at NativeMethodAccessorImpl.java:0

In [25]:
spark.range(10).toDF("id").rdd.map(lambda row: row[0])

PythonRDD[110] at RDD at PythonRDD.scala:53

**Convert back from rdd to DF**

This command creates an RDD of type Row. This row is the internal Catalyst format that Spark uses to represent data in the Structured APIs

In [32]:
spark.range(10).rdd.toDF() # not add any column name in toDf('new_column_names')

DataFrame[id: bigint]

#### CASE 2: from collections

In [33]:
myCollection = "ciao, e ci si prova di nuovo".split(" ")
n_partitions = 2

spark.sparkContext.parallelize(myCollection, n_partitions)

ParallelCollectionRDD[143] at readRDDFromFile at PythonRDD.scala:274

#### Set a name for the RDD to check in UI (https://spark.apache.org/docs/latest/web-ui.html)

In [37]:
words = spark.sparkContext.parallelize(myCollection, n_partitions)

In [38]:
words.setName("RddName")

In [39]:
words.count()

7

In [40]:
#### CASE 3: from Data Source (file(s))
spark.sparkContext.textFile('./data/flight-data/json/2015-summary.json')



./data/flight-data/json/2015-summary.json MapPartitionsRDD[149] at textFile at NativeMethodAccessorImpl.java:0

In this RDD, the name of the file is the first object and the value of the text file is the second string object.

[('file_name1', 'object_file1'),
('file_name2', 'object_file2'),
...]

In [54]:
rdd_whole_files = spark.sparkContext.wholeTextFiles('./data/flight-data/json')

print(sorted(rdd_whole_files.collect()))

[('file:/home/ail/Spark/data/flight-data/json/2010-summary.json', '{"ORIGIN_COUNTRY_NAME":"Romania","DEST_COUNTRY_NAME":"United States","count":1}\n{"ORIGIN_COUNTRY_NAME":"Ireland","DEST_COUNTRY_NAME":"United States","count":264}\n{"ORIGIN_COUNTRY_NAME":"India","DEST_COUNTRY_NAME":"United States","count":69}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Egypt","count":24}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Equatorial Guinea","count":1}\n{"ORIGIN_COUNTRY_NAME":"Singapore","DEST_COUNTRY_NAME":"United States","count":25}\n{"ORIGIN_COUNTRY_NAME":"Grenada","DEST_COUNTRY_NAME":"United States","count":54}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Costa Rica","count":477}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Senegal","count":29}\n{"ORIGIN_COUNTRY_NAME":"Marshall Islands","DEST_COUNTRY_NAME":"United States","count":44}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Guyana","count":17}\n{"ORIGIN_COU

In [55]:
## ERRORS 
# Can not infer schema for type: <class 'str'>
# spark.sparkContext.textFile('./data/flight-data/json/2015-summary.json').toDF()



### Manipulting RDDs I : Transformations

documentation: https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

```rdd.distinct()``` --< remove duplicates 

```rdd.filter(lambda x: filter_function(x))```

```rdd.map((lambda x: map_function(x))```

```rdd.filterMap((lambda x: map_function(x))```

```rdd.sortBy(lambda x: keyfunc)```: Sorts this RDD by the given keyfunc

```rdd.RandomSpilt([0.25,0.75])```

### Method:  ```rdd.sortBy(lambda x: keyfunc)```

In [19]:
mp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

# x is the rdd, x[0] is the key
spark.sparkContext.parallelize(tmp).sortBy(lambda x: x[0]).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [20]:
# sort by value

In [21]:
spark.sparkContext.parallelize(tmp).sortBy(lambda x: x[1]).collect()

[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

In [84]:
words = "parola corta rude non si".split(" ")
words = spark.sparkContext.parallelize(words) # transform list to rdd
words.sortBy(lambda word: len(word)* -1).collect() # order by lenght

['parola', 'corta', 'rude', 'non', 'si']

### Method:  ```rdd.sortByKey()```

In [69]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
spark.sparkContext.parallelize(tmp).sortByKey()

PythonRDD[197] at RDD at PythonRDD.scala:53

In [78]:
spark.sparkContext.parallelize(tmp).collect()

[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

### Method:  ```rdd.randomSplit([weight1,weight2, ..., weightn])```

In [95]:
words.randomSplit([0.25,0.3,0.3])

[PythonRDD[295] at RDD at PythonRDD.scala:53,
 PythonRDD[296] at RDD at PythonRDD.scala:53,
 PythonRDD[297] at RDD at PythonRDD.scala:53]

In [99]:
words = "parola corta rude non si".split(" ")
words = spark.sparkContext.parallelize(words) 
element =  words.randomSplit([0.25,0.75])
element[1].collect()

['parola', 'corta', 'rude', 'si']

#### Method ```rdd.map((lambda x: map_function(x))``` vs ```rdd.filterMap((lambda x: map_function(x))```

map :It returns a new RDD by applying a function to each element of the RDD. Function in map can return only one item.

flatMap: Similar to map, it returns a new RDD by applying  a function to each element of the RDD, but output is flattened.

In [66]:
spark.sparkContext.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect()

[[3, 9], [4, 16], [5, 25]]

In [64]:
spark.sparkContext.parallelize([3,4,5]).flatMap(lambda x:  [x,  x*x]).collect()

[3, 9, 4, 16, 5, 25]

### Manipulting RDDs II : Actions 

documentation: https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

#### Method ```countByValue(self)``` 
Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

In [None]:
spark.sparkContext.parallelize([3,4,5,3,3,4]).countByValue()

In [None]:
words.countByValue()


#### Method ```countApprox(timeout, confidence=0.95)``` 
Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished.

In [113]:
# rdd = spark.sparkContext.parallelize(range(20000), 10)
# words.countApprox(10, 0.9) 


#### Methods ```first()```  ```min()```  ```max()```  ```take()``` 


In [4]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
spark.sparkContext.parallelize(tmp).sortByKey().first()

('1', 3)

In [5]:
spark.sparkContext.parallelize(tmp).sortByKey().min()

('1', 3)

In [6]:
spark.sparkContext.parallelize(tmp).sortByKey().max()

('d', 4)

#### Methods ```take()```   ```takeOrdered()```  ```takeSample()```   ```top()``` 

```takeSample()```  to specify a fixed-size random sample from your RDD. You can specify
whether this should be done by using withReplacement, the number of values, as well as the
random seed. 

```takeOrdered()``` ame as .sortByKey().take()

```top()```  is effectively the opposite of ```takeOrdered()```  in that it selects the top values according to the implicit ordering

In [8]:
spark.sparkContext.parallelize(tmp).take(3)

[('a', 1), ('b', 2), ('1', 3)]

In [7]:
spark.sparkContext.parallelize(tmp).sortByKey().take(3) #takeOrdered()

[('1', 3), ('2', 5), ('a', 1)]

In [9]:
spark.sparkContext.parallelize(tmp).takeOrdered(3) #same as .sortByKey().take(3)

[('1', 3), ('2', 5), ('a', 1)]

In [11]:
withReplacement = True
numberToTake = 3
randomSeed = 23
spark.sparkContext.parallelize(tmp).takeSample(withReplacement, numberToTake, randomSeed)

[('d', 4), ('d', 4), ('2', 5)]

### Save RDDs to Files
#### METHOD ```rdd.saveAsTextFile('path/to/data_folder')``` or  ```repartition(n_rep).saveAsTextFile('path/to/data_folder')```

In [13]:
spark.sparkContext.parallelize(tmp).repartition(5).saveAsTextFile("words_to_text_file")
# this command create a folder called "words_to_text_file" and writes 5 files (part-xxxxx) 
# in addition to a metadata file _SUCCESS

In [16]:
# force to write to one file  with .repartition(1)
spark.sparkContext.parallelize(tmp).repartition(1).saveAsTextFile("words_to_one_text_file") 

In [None]:
# Spark choose the number of partitions
spark.sparkContext.parallelize(tmp).repartition(1).saveAsTextFile("words_to_text_file_spark_choose") 

In [None]:
flights = spark.sparkContext.textFile('./data/flight-data/csv/2015-summary.csv')
flights.map( lambda x : x.split(";") ).map( lambda x: Test(x(0),x(1)) ).toDF()
# flights.map(_.split(",")).map{case Array(a,b,c) => 
# (a,b.toInt,c)}.toDF("name","age","city")

####  METHOD ```rdd.saveAsObjectFile('path/to/data_folder')``` (ONLY Java and Scala)

A sequenceFile is a flat file consisting of binary key–value pairs. It is extensively used in MapReduce as input/output formats. Spark can write to sequenceFiles using the saveAsObjectFile method or by explicitly writing
key–value pairs,


```rdd.saveAsObjectFile('path/to/data_folder')``` Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().


In [18]:
spark.sparkContext.parallelize(tmp).saveAsObjectFiles("words_to_saveAsObjectFile") 

AttributeError: 'RDD' object has no attribute 'saveAsObjectFiles'

## Persisting (or caching) a dataset in memory across operations [Ch. 20]

mark an RDD to be persisted using the persist() or cache() methods on it.


```rdd.persist(storageLevel)```:  Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.

```rdd.cache()```: a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).



**Storage Level**

    MEMORY_ONLY: 	Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

    MEMORY_AND_DISK: Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

    MEMORY_ONLY_SER (Java and Scala): 	Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

    MEMORY_AND_DISK_SER (Java and Scala): Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

    DISK_ONLY	Store the RDD partitions only on disk.

    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.: Same as the levels above, but replicate each partition on two cluster nodes.

    OFF_HEAP (experimental): Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.


**Note: In Python** stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level. The available storage levels in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, DISK_ONLY_2, and DISK_ONLY_3.

### Checkingpointing

```rdd.checkpoint()``` Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

```spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing")```
```rdd.checkpoint()```
when we reference this RDD, it will derive from the checkpoint instead of the source data.
This can be a helpful optimization.

In [24]:
spark.sparkContext.setCheckpointDir("checkpointing")
rddd_ = spark.sparkContext.parallelize(tmp)

rddd_.checkpoint()

In [25]:
rddd_.setName("Check")
Check.checkpoint()


NameError: name 'Check' is not defined