## Overview
This notebook shows how to create PySpark Dataframe and how to view the data in pyspark using different ways and options. 

#### **Contents :**

- **What is a DataFrame in Spark**
- **Setting up Spark Session**
- **Dataframe Creation**
    1. PySpark DataFrame from a list of rows
    2. PySpark DataFrame with an explicit schema
    3. PySpark DataFrame from a pandas DataFrame
    4. Create Empty DataFrame from Empty RDD with Schema (StructType)
    5. Create Empty DataFrame Directly with Schema
    6. Create Empty DataFrame without Schema (no columns)
    7. Convert Empty RDD to DataFrame
- **Viewing Dataframe**
    - Viewing the Dataframe via `df.show()`, `display(df)` and `df.head()`
    - Setting spark conf for eager evaluation
    - Viewing the dataframe row vertically
    - Viewing DataFrame columns, schema and summary
    - Viewing the dataframe via `Dataframe.collect()` , `DataFrame.take()` and `DataFrame.tail()`
    - Convert PySpark Datframe into Pandas Dataframe
- **DataFrame with SQL**


This is a **Python** notebook so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` magic command. `Python`, `Scala(%scala)`, `SQL(%sql)`, `FileStore(%fs)` and `R(%r)` all are supported.

**Spark Dataframe Documentation Link**
- https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#DataFrame-Creation

### What is a DataFrame in Spark
In Spark, DataFrames are the distributed collections of data, organized into rows and columns. Each column in a DataFrame has a name and an associated type. DataFrames are similar to traditional database tables, which are structured and concise. We can say that DataFrames are relational databases with better optimization techniques.

Spark DataFrames can be created from various sources, such as Hive tables, log tables, external databases, or the existing RDDs. DataFrames allow the processing of huge amounts of data.

Spark has an easy-to-use API for handling structured and unstructured data called **Dataframe**. Every DataFrame has a blueprint called a **Schema**. It can contain universal data types string types and integer types and the data types which are specific to spark such as struct type.

---------
##### **Why DataFrames?**
When Apache Spark 1.3 was launched, it came with a new API called DataFrames that resolved the limitations of performance and scaling that occur while using RDDs.

When there is not much storage space in memory or on disk, RDDs do not function properly as they get exhausted. Besides, Spark RDDs do not have the concept of schema—the structure of a database that defines its objects. RDDs store both structured and unstructured data together, which is not very efficient.

RDDs cannot modify the system in such a way that it runs more efficiently. RDDs do not allow us to debug errors during the runtime. They store the data as a collection of Java objects.

RDDs use serialization (converting an object into a stream of bytes to allow faster processing) and garbage collection (an automatic memory management technique that detects unused objects and frees them from memory) techniques. This increases the overhead on the memory of the system as they are very lengthy.

This was when DataFrames were introduced to overcome the limitations Spark RDDs had. 

#### Setting up Spark Session
PySpark applications start with initializing SparkSession which is the entry point of PySpark as below. In case of running it in PySpark shell via pyspark executable, the shell automatically creates the session in the variable spark for users.

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# create spark context
# conf = SparkConf().setAppName('MySparkApp').setMaster('local')
# sc = SparkContext.getOrCreate(conf=conf)
# print(conf.get("spark.master"))
# print(conf.get("spark.app.name"))

# OR

# create spark session
spark = SparkSession.builder.appName('MySparkApp').master('local').getOrCreate()

### Dataframe Creation

##### 1. PySpark DataFrame from a list of rows

In [0]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row



In [0]:
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

display(df)
display(df.printSchema())

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01T12:00:00.000+0000
2,3.0,string2,2000-02-01,2000-01-02T12:00:00.000+0000
4,5.0,string3,2000-03-01,2000-01-03T12:00:00.000+0000


root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



##### 2. PySpark DataFrame with an explicit schema

In [0]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')

display(df.printSchema)
display(df)

<bound method DataFrame.printSchema of DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]>

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01T12:00:00.000+0000
2,3.0,string2,2000-02-01,2000-01-02T12:00:00.000+0000
3,4.0,string3,2000-03-01,2000-01-03T12:00:00.000+0000


##### 3. PySpark DataFrame from a pandas DataFrame

In [0]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})

df = spark.createDataFrame(pandas_df)

display(df.printSchema)
display(df)


<bound method DataFrame.printSchema of DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]>

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01T12:00:00.000+0000
2,3.0,string2,2000-02-01,2000-01-02T12:00:00.000+0000
3,4.0,string3,2000-03-01,2000-01-03T12:00:00.000+0000


##### 4. Create Empty DataFrame from Empty RDD with Schema (StructType)
In order to create an empty PySpark DataFrame manually with schema (column names & datatypes). First, [Create a schema using StructType and StructField](https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/)

In [0]:
#Create Schema
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField('firstname', StringType(), True),
                     StructField('middlename', StringType(), True),
                     StructField('lastname', StringType(), True)])
                     
print(schema)

StructType([StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True)])


Now create the empty RDD and pass it to `createDataFrame()` of SparkSession along with the schema for column names & data types.

In [0]:
#Creates Empty RDD using parallelize
emptyRDD= spark.sparkContext.parallelize([])
print(emptyRDD)

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD, schema)
df.printSchema()

ParallelCollectionRDD[119] at readRDDFromInputStream at PythonRDD.scala:435
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



##### 5. Create Empty DataFrame Directly with Schema

In [0]:
#Create empty DataFrame directly
df = spark.createDataFrame([], schema)
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



##### 6. Create Empty DataFrame without Schema (no columns)
To create empty DataFrame with out schema (no columns) just create a empty schema and use it while creating PySpark DataFrame.

In [0]:
#Create empty DatFrame with no schema (no columns)
df3 = spark.createDataFrame([], StructType([]))
df3.printSchema()

root



##### 7. Convert Empty RDD to DataFrame
We can also create empty DataFrame by converting empty RDD to DataFrame using `toDF()`

In [0]:
#Convert empty RDD to Dataframe
df1 = emptyRDD.toDF(schema)
df1.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



### Viewing Data 
There are several ways to view the created pyspark dataframe. 

##### Viewing the Dataframe via `df.show()`, `display(df)` and `df.head()`

In [0]:
df.show(2)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 2 rows



In [0]:
display(df)

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01T12:00:00.000+0000
2,3.0,string2,2000-02-01,2000-01-02T12:00:00.000+0000
4,5.0,string3,2000-03-01,2000-01-03T12:00:00.000+0000


In [0]:
df.head(2)

Out[15]: [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0))]

##### Setting spark conf for eager evaluation
Alternatively, we can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration.

In [0]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 2)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00


##### Viewing the dataframe row vertically
Useful when rows data are too lengthy to be seen horizontally.

In [0]:
df.show(2, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
only showing top 2 rows



##### Viewing DataFrame columns, schema and summary 

In [0]:
# for viewing columns
df.columns

Out[20]: ['a', 'b', 'c', 'd', 'e']

In [0]:
# for viewing schema
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [0]:
# for viewing summary of dataframe
df.summary().show()

+-------+------------------+------------------+-------+
|summary|                 a|                 b|      c|
+-------+------------------+------------------+-------+
|  count|                 3|                 3|      3|
|   mean|2.3333333333333335|3.3333333333333335|   null|
| stddev|1.5275252316519468|1.5275252316519468|   null|
|    min|                 1|               2.0|string1|
|    25%|                 1|               2.0|   null|
|    50%|                 2|               3.0|   null|
|    75%|                 4|               5.0|   null|
|    max|                 4|               5.0|string3|
+-------+------------------+------------------+-------+



#####  Viewing the dataframe via `Dataframe.collect()` , `DataFrame.take()` and `DataFrame.tail()`
`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. 

Note that this can throw an **out-of-memory error** when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [0]:
df.collect()

Out[32]: [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=4, b=5.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

To avoid throwing an **out-of-memory exception**, use `DataFrame.take()` or `DataFrame.tail()`

In [0]:
df.take(2)

Out[33]: [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0))]

##### Convert PySpark Datframe into Pandas Dataframe
PySpark DataFrame also provides the conversion back to a pandas DataFrame to leverage pandas API. 

Note that `DataFrame.toPandas()` also collects all data into the driver side that can easily cause an **out-of-memory-error** when the data is too large to fit into the driver side.

In [0]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,4,5.0,string3,2000-03-01,2000-01-03 12:00:00


### DataFrame with SQL
DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. 
For example, we can register the DataFrame as a table and run a SQL easily.

In [0]:
df_color = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])

df_color.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [0]:
df_color.createOrReplaceTempView('colorTable')
spark.sql('select count(*) from colorTable').show()

+--------+
|count(1)|
+--------+
|       8|
+--------+

