## Absolute basics of PySpark DataFrame
### Apache Spark
[Apache Spark](https://spark.apache.org/) is one of the hottest new trends in the technology domain. It is the framework with probably the **highest potential to realize the fruit of the marriage between Big Data and Machine Learning**. It runs fast (up to 100x faster than traditional [Hadoop MapReduce](https://www.tutorialspoint.com/hadoop/hadoop_mapreduce.htm)) due to in-memory operation, offers robust, distributed, fault-tolerant data objects (called [RDD](https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm)), and integrates beautifully with the world of machine learning and graph analytics through supplementary packages like [Mlib](https://spark.apache.org/mllib/) and [GraphX](https://spark.apache.org/graphx/).

Spark is implemented on Hadoop/HDFS and written mostly in Scala, a functional programming language, similar to Java. In fact, Scala needs the latest Java installation on your system and runs on JVM. However, for most of the beginners, Scala is not a language that they learn first to venture into the world of data science. Fortunately, Spark provides a wonderful Python integration, called PySpark, which lets Python programmers to interface with the Spark framework and learn how to manipulate data at scale and work with objects and algorithms over a distributed file system.

### DataFrame
In Apache Spark, a DataFrame is a distributed collection of rows under named columns. It is conceptually equivalent to a table in a relational database, an Excel sheet with Column headers, or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. It also shares some common characteristics with RDD:

* __Immutable in nature__ : We can create DataFrame / RDD once but can’t change it. And we can transform a DataFrame / RDD  after applying transformations.
* __Lazy Evaluations__: Which means that a task is not executed until an action is performed.
* __Distributed__: RDD and DataFrame both are distributed in nature.

### Advantages of the DataFrame

* DataFrames are designed for processing large collection of structured or semi-structured data.
* Observations in Spark DataFrame are organised under named columns, which helps Apache Spark to understand the schema of a DataFrame. This helps Spark optimize execution plan on these queries.
* DataFrame in Apache Spark has the ability to handle petabytes of data.
* DataFrame has a support for wide range of data format and sources.
* It has API support for different languages like Python, R, Scala, Java.


In [1]:
import pyspark

In [2]:
from pyspark import SparkContext as sc
from pyspark.sql import Row

### Create a _SparkSession app_ object

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark1 = SparkSession.builder.appName('Basics').getOrCreate()

### Read in a JSON file and examine

In [5]:
df = spark1.read.json('Data/people.json')

#### Unlike Pandas DataFrame, it does not show itself when called.This is part of the *lazy evaluation*

In [6]:
df

DataFrame[age: bigint, name: string]

#### You have to call **`show()`** method to evaluate it i.e. show it

In [7]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Use **`printSchema()`** to show he schema of the data. Note, how tightly it is integrated to the SQL-like framework. You can even see that the schema accepts `null` values because  _nullable_ property is set `True`.

In [8]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



#### Fortunately a simple **`columns`** method exists to get column names back as a Python list 

In [9]:
col_list=df.columns

In [10]:
col_list

['age', 'name']

In [11]:
type(col_list)

list

#### Similar to Pandas, the **`describe`** method is used for the statistical summary

In [12]:
df.describe

<bound method DataFrame.describe of DataFrame[age: bigint, name: string]>

#### But unlike Pandas, calling only **`describe()`** returns a DataFrame!

In [13]:
df.describe()

DataFrame[summary: string, age: string, name: string]

#### True to the spirit of lazy evaluation, you have to evaluate the resulting DataFrame by calling **`show()`**

In [14]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



#### You can also use **`summary()`** method for more descriptive statistics including quartiles

In [15]:
df.summary().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    25%|                19|   null|
|    50%|                19|   null|
|    75%|                30|   null|
|    max|                30|Michael|
+-------+------------------+-------+



### How you can define your own Data Schema

#### Import data types and structure types to build the data schema yourself

In [16]:
from pyspark.sql.types import StructField, IntegerType, StringType, StructType

#### Define your data schema by supplying name and data types to the structure fields you will be importing

In [17]:
data_schema = [StructField('age',IntegerType(),True),
              StructField('name',StringType(),True)]

#### Now create a `StrucType` with this schema as field

In [18]:
final_struc = StructType(fields=data_schema)

#### Now read in the same old JSON with this new schema 

In [19]:
df = spark1.read.json('Data/people.json',schema=final_struc)

In [20]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Now when you print the schema, you will see that the _age_ is read as `int` and not `long`. By default Spark could not figure out for this column the exact data type that you wanted, so it went with `long`. But this is how you can build your own schema and instruct Spark to read the data accoridngly. 

In [21]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



### How to grab data from the DataFrame; _Column_ and _Row_ objects

#### What is the type of a single column?

In [22]:
type(df['age'])

pyspark.sql.column.Column

#### But how to extract a single column as a DataFrame? Use **`select()`**

In [23]:
df.select('age')

DataFrame[age: int]

In [24]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



We can pass a list to select multiple columns

In [31]:
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### What is Row object?

Let's grab top two rows using `head`

In [25]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

This is a list

In [26]:
type(df.head(2))

list

Let's get the first element off this list

In [27]:
row0=df.head(2)[0]

In [29]:
type(row0)

pyspark.sql.types.Row

#### You can get back a normal Python dictionary from the row object

In [30]:
row0.asDict()

{'age': None, 'name': 'Michael'}

#### Remember that in Pandas DataFrame we have `pandas.series` object as either column or row. The reason Spark offers separate `Column` or `Row` object is the ability to work over a distributed file system where this distinction will come handy. 

### Creating new column

#### You cannot think like Pandas. <span style="color:red;font-size:16pt">Following will produce error</span>

In [32]:
df['newage']=2*df['age']

TypeError: 'DataFrame' object does not support item assignment

#### Use **`useColumn()`** method instead
Note the second argument to the function. You have to pass `df['age]` i.e. a `pyspark.sql.column.Column` object

In [33]:
df.withColumn('double_age',df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



This operation does not impact the original dataframe i.e. they are not *in-place* operations

In [34]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Just for renaming, use **`withColumnRenamed()`** method

In [35]:
df.withColumnRenamed('age','my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



#### You can do operation with multiple columns, like a vector sum

In [36]:
df2=df.withColumn('half_age',df['age']/2)
df2.show()

+----+-------+--------+
| age|   name|half_age|
+----+-------+--------+
|null|Michael|    null|
|  30|   Andy|    15.0|
|  19| Justin|     9.5|
+----+-------+--------+



In [37]:
df2=df2.withColumn('new_age',df2['age']+df2['half_age'])
df2.show()

+----+-------+--------+-------+
| age|   name|half_age|new_age|
+----+-------+--------+-------+
|null|Michael|    null|   null|
|  30|   Andy|    15.0|   45.0|
|  19| Justin|     9.5|   28.5|
+----+-------+--------+-------+



#### Now if you print the schema, you will see that the data type of _half_age_ and _new_age_ are automaically set to `double` (due to floating point operation performed)

In [38]:
df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- half_age: double (nullable = true)
 |-- new_age: double (nullable = true)



#### DataFrame is immutable and there is no `inplace` choice like Pandas! So the original DataFrame has not changed

In [39]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Integration with SparkSQL - Run SQL query!
You may be wondering why this `SparkSession` object came out of the `spark.sql` class. That is because it is tightly integrated with the SparkSQL and is designed to work with SQL or SQL-like queries seamlessly for data analytics.

#### It is good to create a temporary view of the DataFrame. Here `people` is the name of the SQL table view.

In [40]:
df.createOrReplaceTempView('people')

#### Now we can run any SQL query directly on this view! It returns a DataFrame.

In [41]:
result = spark1.sql("SELECT * FROM people")
result

DataFrame[age: int, name: string]

In [42]:
result.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Slightly more complex query

In [43]:
result_over_25 = spark1.sql("SELECT * FROM people WHERE age > 25")
result_over_25.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



#### Lifetime of the temp view
The lifetime of this temporary table is tied to the :class:`SparkSession` that was used to create this :class:`DataFrame`.

In [45]:
spark1.stop() # Stopping the session and underlying SparkContext

In [46]:
result_over_25 = spark1.sql("SELECT * FROM people WHERE age > 25")

In [47]:
result_over_25.show()

Py4JJavaError: An error occurred while calling o91.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:99)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1479)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.buildReader(JsonFileFormat.scala:98)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:160)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:295)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:293)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:313)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
	at org.apache.spark.sql.execution.LocalLimitExec.inputRDDs(limit.scala:97)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
