<img src="images/spark.png" alt="drawing" width="200"/>

# Introduction Spark DataFrame

A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. Apache Spark DataFrames provide a rich set of functions (select columns, filter, join, aggregate) that allow you to solve common data analysis problems efficiently.

Apache Spark DataFrames are an abstraction built on top of Resilient Distributed Datasets (RDDs). Spark DataFrames and Spark SQL use a unified planning and optimization engine, allowing you to get nearly identical performance across all supported languages (Python, SQL, Scala, and R).




In [1]:
import pyspark
from pyspark.sql import SparkSession

# create a spark session
spark = SparkSession.builder.master("local[1]").appName('SparkDataFrame').getOrCreate()

# get a spark contex
sc = spark.sparkContext


print("APP Name :"+sc.appName);
print("Master   :"+ sc.master);


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/10 18:49:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
APP Name :SparkDataFrame
Master   :local[1]


## Create a DataFrame from RDD

Most Apache Spark queries return a DataFrame. This includes reading from a table, loading data from files, and operations that transform data.



In [2]:


iphones_RDD = sc.parallelize([ ("XS", 2018, 5.65, 2.79, 6.24), \
("XR", 2018, 5.94, 2.98, 6.84),\
("X10", 2017, 5.65, 2.79, 6.13),\
("8Plus", 2017, 6.23, 3.07, 7.12)\
])

names = ['Model', 'Year', 'Height', 'Width', 'Weight']

iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
iphones_df.show()




                                                                                

+-----+----+------+-----+------+
|Model|Year|Height|Width|Weight|
+-----+----+------+-----+------+
|   XS|2018|  5.65| 2.79|  6.24|
|   XR|2018|  5.94| 2.98|  6.84|
|  X10|2017|  5.65| 2.79|  6.13|
|8Plus|2017|  6.23| 3.07|  7.12|
+-----+----+------+-----+------+



## Create a DataFrame from pandas

You can also create a Spark DataFrame from a list or a pandas DataFrame, such as in the following example:

In [3]:
import pandas as pd

data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]

pdf = pd.DataFrame(data, columns=["i", "n"])

df1 = spark.createDataFrame(pdf)
df1.show()

df2 = spark.createDataFrame(data, schema="id LONG, name STRING")
df2.show()

+---+----+
|  i|   n|
+---+----+
|  1|Elia|
|  2| Teo|
|  3|Fang|
+---+----+

+---+----+
| id|name|
+---+----+
|  1|Elia|
|  2| Teo|
|  3|Fang|
+---+----+



## Load data into a DataFrame from files

You can load data from many supported file formats. The following example uses a dataset available in the datasets directory.

In [4]:
# CSV format
df_csv = spark.read.csv("files/ratings.csv", header=True, inferSchema=True)

df_csv.tail(3)

                                                                                

[Row(userId=5987, movieId=70, rating=5.0, timestamp=1382563110),
 Row(userId=5987, movieId=150, rating=2.5, timestamp=1382564911),
 Row(userId=5987, movieId=152, rating=5.0, timestamp=1382756733)]

Load data from a json file

In [7]:

ratingDF = spark.read.json("files/ratings.json")

# The inferred schema can be visualized using the printSchema() method
ratingDF.printSchema()

ratingDF.show(5)

                                                                                

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|    296|   5.0|1147880044|     1|
|    306|   3.5|1147868817|     1|
|    307|   5.0|1147868828|     1|
|    665|   5.0|1147878820|     1|
|    899|   3.5|1147868510|     1|
+-------+------+----------+------+
only showing top 5 rows



 ## DataFrame Operations
 
 Like RDD Spark suporte  support two types of Data Frame operations: transformations and actions
 
* Transformations  create a new dataset from an existing one.
* Actions, which return a value to the driver program after running a computation on the dataset. 

For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

<img src="images/lazyTranformation.png" alt="drawing" width="900"/>

Here, first an RDD is calculated by reading data from a stable storage and two of the transformations are performed on the RDD and then finally an action is performed to get the result.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

 ### Examples of Transformations and Actions

#### select( ) - Transformation subsets the columns in the DataFrame
#### select( ) - Action prints first 20 rows in the DataFrame

In [13]:
ratingDF = spark.read.json("files/ratings.json")

rating_value_df = ratingDF.select("rating")

rating_value_df.show(5)

                                                                                

+------+
|rating|
+------+
|   5.0|
|   3.5|
|   5.0|
|   5.0|
|   3.5|
+------+
only showing top 5 rows



#### filter( ) transformation filters out the rows based on a condition

In [16]:
ratingDF = spark.read.json("files/ratings.json")

new_df_rating_3 = ratingDF.filter(ratingDF.rating > 3.0)

new_df_rating_3.show(5)

                                                                                

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|    296|   5.0|1147880044|     1|
|    306|   3.5|1147868817|     1|
|    307|   5.0|1147868828|     1|
|    665|   5.0|1147878820|     1|
|    899|   3.5|1147868510|     1|
+-------+------+----------+------+
only showing top 5 rows



#### flatMap( ) returns multiple values for each element in the original RDD

In [12]:
RDD = sc.parallelize(["hello word", "How are you"])
RDD_flatMap = RDD.flatMap(lambda x : x.split(" "))
print ("RDD_flatMap: ", RDD_flatMap.collect()) # action convert to a  List

RDD_flatMap:  ['hello', 'word', 'How', 'are', 'you']


#### union( ) Return the union of this RDD and another one

In [13]:
rdd01 = sc.parallelize([1, 3, 5, 7])
rdd02 = sc.parallelize([2, 4, 6, 8])
rdd03 = rdd01.union(rdd02)
rdd03.collect()

[1, 3, 5, 7, 2, 4, 6, 8]

### Actions 

#### collection ( ) Return a list that contains all of the elements in this RDD.
Note: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

In [14]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

newData = rdd.collect()
for d in newData:
    print (f"Value: {d}")


Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12


#### take(num) – Take the first num elements of the RDD.

In [15]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

newData = rdd.take(2)
for d in newData:
    print (f"Value: {d}")

Value: 1
Value: 2


#### first( ) – Returns the first record of the RDD

In [16]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

newData = rdd.first()
print (f"Value: {newData}")

Value: 1


#### count( ) – Returns the number of records in an RDD

In [17]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

num = rdd.count()
print (f"Count: {num}")

Count: 12


#### max( ) – Returns max record

In [18]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

num = rdd.max()
print (f"Max: {num}")

Max: 12


#### reduce( ) – Reduces the records to single, we can use this to count or sum.

In [19]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

num = rdd.reduce(lambda a,b: (a+b))
print (f"Max: {num}")

Max: 78


## Pair RDDs

Spark Paired RDDs are RDDs containing a key-value pair. Key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value.

### Creating Pair RDDs

Two common ways to create pair RDD:
 * From a list of key-value tuples
 * from a regular RDD

#### Create a Pair RDD from regular RDD

In [20]:

rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())

[('a', 1), ('b', 1), ('c', 1)]

#### Create a Pair RDD from a list

In [21]:
rdd = sc.parallelize([(1,"a"), (2,"b"), (3,"c")])
rdd.collect()

[(1, 'a'), (2, 'b'), (3, 'c')]

### Transformations on pair RDDs
All regular transformations work on pair RDD. Have to pass functions that operate on key value pairs rather than on individual elements

#### reduceByKey(fun) - groups all the values with the same key.

In [22]:
rdd = sc.parallelize([("a",1), ("b",2), ("c", 10),("a", 2), ("d", 5), ("a", 4) ])
rdd_reduceByKey = rdd.reduceByKey(lambda x, y: x+y )
rdd_reduceByKey.collect()

                                                                                

[('a', 7), ('b', 2), ('c', 10), ('d', 5)]

#### sortByKey(fun) - Order RDD pair by key.

In [23]:
rdd = sc.parallelize([("a",1), ("c",2), ("b", 10),("a", 2), ("d", 5), ("a", 4) ])
rdd_reduceByKey = rdd.reduceByKey(lambda x, y: x+y )
rdd_reduceByKey.sortByKey(ascending = True).collect()

[('a', 7), ('b', 10), ('c', 2), ('d', 5)]

#### groupByKey( ) - Groups all the values with the same key in the pair 

In [24]:
rdd = sc.parallelize([("a",1), ("c",2), ("b", 10),("a", 2), ("d", 5), ("a", 4) ])
rdd_groupByKey = rdd.groupByKey().collect()
for letter, value in  rdd_groupByKey:
    print (letter, list(value))

a [1, 2, 4]
c [2]
b [10]
d [5]


#### join( ) - transformation joins the two pair RDDs based on their key

In [25]:
rdd01 = sc.parallelize([("a",1), ("b", 5),("c", 7) ])
rdd02 = sc.parallelize([("a",2), ("b", 3),("d", 4) ])

rdd01. join(rdd02).collect()


[('b', (5, 3)), ('a', (1, 2))]

#### countByKey( ) - action counts the number of elements for each key

In [26]:
rdd = sc.parallelize([("a",2), ("b", 4),("a", 3) ])
for key, val in  rdd.countByKey().items():
    print (key, val)

a 2
b 1


#### collectAsMap( ) - action return the key-value pairs in the RDD as a dictionary

In [27]:
rdd = sc.parallelize([("a",2), ("b", 4),("c", 3) ])
rdd.collectAsMap()

{'a': 2, 'b': 4, 'c': 3}